diff options
author | Roger Dingledine <arma@torproject.org> | 2002-07-18 06:37:58 +0000 |
---|---|---|
committer | Roger Dingledine <arma@torproject.org> | 2002-07-18 06:37:58 +0000 |
commit | 267434bdeac40a2ccc2677119ddc1925b80c0c4c (patch) | |
tree | 27ce149ec317584dddb923f8e7fd4544baf59d15 /src/or | |
parent | ccdef66b68a2f61dfe600fddafaf270537928fac (diff) | |
download | tor-267434bdeac40a2ccc2677119ddc1925b80c0c4c.tar.gz tor-267434bdeac40a2ccc2677119ddc1925b80c0c4c.zip |
Implemented congestion control
Servers are allowed to send 100 cells initially, and can't send more until
they receive a 'sendme' cell from that direction, indicating that they
can send 10 more cells. As it currently stands, the exit node quickly
runs out of window, and sends bursts of 10 whenever a sendme cell gets
to him. This is much much much faster (and more flexible) than the old
"give each circuit 1 kB/s and hope nothing overflows" approach.
Also divided out the connection_watch_events into stop_reading,
start_writing, etc. That way we can control them separately.
svn:r54
Diffstat (limited to 'src/or')
-rw-r--r-- | src/or/buffers.c | 18 | ||||
-rw-r--r-- | src/or/circuit.c | 3 | ||||
-rw-r--r-- | src/or/command.c | 78 | ||||
-rw-r--r-- | src/or/connection.c | 120 | ||||
-rw-r--r-- | src/or/connection_ap.c | 20 | ||||
-rw-r--r-- | src/or/connection_exit.c | 20 | ||||
-rw-r--r-- | src/or/connection_op.c | 2 | ||||
-rw-r--r-- | src/or/connection_or.c | 5 | ||||
-rw-r--r-- | src/or/main.c | 35 | ||||
-rw-r--r-- | src/or/or.h | 13 |
10 files changed, 244 insertions, 70 deletions
diff --git a/src/or/buffers.c b/src/or/buffers.c index a14cf0d977..d5c35aa4f5 100644 --- a/src/or/buffers.c +++ b/src/or/buffers.c @@ -26,26 +26,26 @@ void buf_free(char *buf) { int read_to_buf(int s, int at_most, char **buf, size_t *buflen, size_t *buf_datalen, int *reached_eof) { - /* read from socket s, writing onto buf+buf_datalen. Read at most - * 'at_most' bytes, and also don't read more than will fit based on buflen. + /* read from socket s, writing onto buf+buf_datalen. If at_most is >= 0 then + * read at most 'at_most' bytes, and in any case don't read more than will fit based on buflen. * If read() returns 0, set *reached_eof to 1 and return 0. If you want to tear * down the connection return -1, else return the number of bytes read. */ int read_result; - assert(buf && *buf && buflen && buf_datalen && reached_eof && (s>=0) && (at_most >= 0)); + assert(buf && *buf && buflen && buf_datalen && reached_eof && (s>=0)); /* this is the point where you would grow the buffer, if you want to */ - if(*buflen - *buf_datalen < at_most) + if(at_most < 0 || *buflen - *buf_datalen < at_most) at_most = *buflen - *buf_datalen; /* take the min of the two */ /* (note that this only modifies at_most inside this function) */ if(at_most == 0) return 0; /* we shouldn't read anything */ - log(LOG_DEBUG,"read_to_buf(): reading at most %d bytes.",at_most); +// log(LOG_DEBUG,"read_to_buf(): reading at most %d bytes.",at_most); read_result = read(s, *buf+*buf_datalen, at_most); if (read_result < 0) { if(errno!=EAGAIN) { /* it's a real error */ @@ -58,7 +58,7 @@ int read_to_buf(int s, int at_most, char **buf, size_t *buflen, size_t *buf_data return 0; } else { /* we read some bytes */ *buf_datalen += read_result; - log(LOG_DEBUG,"read_to_buf(): Read %d bytes. %d on inbuf.",read_result, *buf_datalen); +// log(LOG_DEBUG,"read_to_buf(): Read %d bytes. %d on inbuf.",read_result, *buf_datalen); return read_result; } @@ -90,8 +90,8 @@ int flush_buf(int s, char **buf, size_t *buflen, size_t *buf_flushlen, size_t *b *buf_datalen -= write_result; *buf_flushlen -= write_result; memmove(*buf, *buf+write_result, *buf_datalen); - log(LOG_DEBUG,"flush_buf(): flushed %d bytes, %d ready to flush, %d remain.", - write_result,*buf_flushlen,*buf_datalen); +// log(LOG_DEBUG,"flush_buf(): flushed %d bytes, %d ready to flush, %d remain.", +// write_result,*buf_flushlen,*buf_datalen); return *buf_flushlen; } } @@ -114,7 +114,7 @@ int write_to_buf(char *string, size_t string_len, memcpy(*buf+*buf_datalen, string, string_len); *buf_datalen += string_len; - log(LOG_DEBUG,"write_to_buf(): added %d bytes to buf (now %d total).",string_len, *buf_datalen); +// log(LOG_DEBUG,"write_to_buf(): added %d bytes to buf (now %d total).",string_len, *buf_datalen); return *buf_datalen; } diff --git a/src/or/circuit.c b/src/or/circuit.c index eeca48cbe9..c2c91338ca 100644 --- a/src/or/circuit.c +++ b/src/or/circuit.c @@ -57,6 +57,9 @@ circuit_t *circuit_new(aci_t p_aci, connection_t *p_conn) { circ->p_aci = p_aci; /* circ->n_aci remains 0 because we haven't identified the next hop yet */ + circ->n_receive_window = RECEIVE_WINDOW_START; + circ->p_receive_window = RECEIVE_WINDOW_START; + circuit_add(circ); return circ; diff --git a/src/or/command.c b/src/or/command.c index 3f3460d889..5a1ad4155d 100644 --- a/src/or/command.c +++ b/src/or/command.c @@ -22,6 +22,12 @@ void command_process_cell(cell_t *cell, connection_t *conn) { case CELL_DESTROY: command_process_destroy_cell(cell, conn); break; + case CELL_SENDME: + command_process_sendme_cell(cell, conn); + break; + default: + log(LOG_DEBUG,"Cell of unknown type (%d) received. Dropping.", cell->command); + break; } } @@ -147,6 +153,8 @@ void command_process_create_cell(cell_t *cell, connection_t *conn) { return; } n_conn->state = EXIT_CONN_STATE_CONNECTING_WAIT; + n_conn->receiver_bucket = -1; /* edge connections don't do receiver buckets */ + n_conn->bandwidth = -1; n_conn->s = -1; /* not yet valid */ if(connection_add(n_conn) < 0) { /* no space, forget it */ log(LOG_DEBUG,"command_process_create_cell(): connection_add failed. Closing."); @@ -159,15 +167,65 @@ void command_process_create_cell(cell_t *cell, connection_t *conn) { } } -void command_process_data_cell(cell_t *cell, connection_t *conn) { +void command_process_sendme_cell(cell_t *cell, connection_t *conn) { circuit_t *circ; - /* FIXME do something with 'close' state, here */ + circ = circuit_get_by_aci_conn(cell->aci, conn); + + if(!circ) { + log(LOG_DEBUG,"command_process_sendme_cell(): unknown circuit %d. Dropping.", cell->aci); + return; + } + + if(circ->state == CIRCUIT_STATE_OPEN_WAIT) { + log(LOG_DEBUG,"command_process_sendme_cell(): circuit in open_wait. Dropping."); + return; + } + if(circ->state == CIRCUIT_STATE_OR_WAIT) { + log(LOG_DEBUG,"command_process_sendme_cell(): circuit in or_wait. Dropping."); + return; + } + + /* at this point both circ->n_conn and circ->p_conn are guaranteed to be set */ + + assert(cell->length == RECEIVE_WINDOW_INCREMENT); + + if(cell->aci == circ->p_aci) { /* it's an outgoing cell */ + circ->n_receive_window += cell->length; + log(LOG_DEBUG,"connection_process_sendme_cell(): n_receive_window for aci %d is %d.",circ->n_aci,circ->n_receive_window); + if(circ->n_conn->type == CONN_TYPE_EXIT) { + connection_start_reading(circ->n_conn); + connection_package_raw_inbuf(circ->n_conn); /* handle whatever might still be on the inbuf */ + } else { + cell->aci = circ->n_aci; /* switch it */ + if(connection_write_cell_to_buf(cell, circ->n_conn) < 0) { /* (clobbers cell) */ + circuit_close(circ); + return; + } + } + } else { /* it's an ingoing cell */ + circ->p_receive_window += cell->length; + log(LOG_DEBUG,"connection_process_sendme_cell(): p_receive_window for aci %d is %d.",circ->p_aci,circ->p_receive_window); + if(circ->p_conn->type == CONN_TYPE_AP) { + connection_start_reading(circ->p_conn); + connection_package_raw_inbuf(circ->p_conn); /* handle whatever might still be on the inbuf */ + } else { + cell->aci = circ->p_aci; /* switch it */ + if(connection_write_cell_to_buf(cell, circ->p_conn) < 0) { /* (clobbers cell) */ + circuit_close(circ); + return; + } + } + } +} + +void command_process_data_cell(cell_t *cell, connection_t *conn) { + circuit_t *circ; circ = circuit_get_by_aci_conn(cell->aci, conn); if(!circ) { - log(LOG_DEBUG,"command_process_data_cell(): received DATA cell for unknown circuit. Dropping."); + log(LOG_DEBUG,"command_process_data_cell(): unknown circuit %d. Dropping.", cell->aci); return; } @@ -184,6 +242,12 @@ void command_process_data_cell(cell_t *cell, connection_t *conn) { if(cell->aci == circ->p_aci) { /* it's an outgoing cell */ cell->aci = circ->n_aci; /* switch it */ + if(--circ->p_receive_window < 0) { /* is it less than 0 after decrement? */ + log(LOG_DEBUG,"connection_process_data_cell(): Too many data cells on aci %d. Closing.", circ->p_aci); + circuit_close(circ); + return; + } + log(LOG_DEBUG,"connection_process_data_cell(): p_receive_window for aci %d is %d.",circ->p_aci,circ->p_receive_window); if(circuit_deliver_data_cell(cell, circ, circ->n_conn, 'd') < 0) { log(LOG_DEBUG,"command_process_data_cell(): circuit_deliver_data_cell (forward) failed. Closing."); circuit_close(circ); @@ -191,6 +255,12 @@ void command_process_data_cell(cell_t *cell, connection_t *conn) { } } else { /* it's an ingoing cell */ cell->aci = circ->p_aci; /* switch it */ + if(--circ->n_receive_window < 0) { /* is it less than 0 after decrement? */ + log(LOG_DEBUG,"connection_process_data_cell(): Too many data cells on aci %d. Closing.", circ->n_aci); + circuit_close(circ); + return; + } + log(LOG_DEBUG,"connection_process_data_cell(): n_receive_window for aci %d is %d.",circ->n_aci,circ->n_receive_window); if(circ->p_conn->type == CONN_TYPE_AP) { /* we want to decrypt, not encrypt */ if(circuit_deliver_data_cell(cell, circ, circ->p_conn, 'd') < 0) { log(LOG_DEBUG,"command_process_data_cell(): circuit_deliver_data_cell (backward to AP) failed. Closing."); @@ -213,7 +283,7 @@ void command_process_destroy_cell(cell_t *cell, connection_t *conn) { circ = circuit_get_by_aci_conn(cell->aci, conn); if(!circ) { - log(LOG_DEBUG,"command_process_destroy_cell(): received DESTROY cell for unknown circuit. Dropping."); + log(LOG_DEBUG,"command_process_destroy_cell(): unknown circuit %d. Dropping.", cell->aci); return; } diff --git a/src/or/connection.c b/src/or/connection.c index 9b9727a5b0..e13ec348ad 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -101,9 +101,7 @@ void connection_free(connection_t *conn) { if(conn->dest_port) free(conn->dest_port); - /* FIXME should we do these for all connections, or just ORs, or what */ - if(conn->type == CONN_TYPE_OR || - conn->type == CONN_TYPE_OP) { + if(connection_speaks_cells(conn)) { EVP_CIPHER_CTX_cleanup(&conn->f_ctx); EVP_CIPHER_CTX_cleanup(&conn->b_ctx); } @@ -158,7 +156,7 @@ int connection_create_listener(RSA *prkey, struct sockaddr_in *local, int type) log(LOG_DEBUG,"connection_create_listener(): Listening on local port %u.",ntohs(local->sin_port)); conn->state = LISTENER_STATE_READY; - connection_watch_events(conn, POLLIN); + connection_start_reading(conn); return 0; } @@ -185,6 +183,11 @@ int connection_handle_listener_read(connection_t *conn, int new_type, int new_st newconn = connection_new(new_type); newconn->s = news; + if(!connection_speaks_cells(newconn)) { + newconn->receiver_bucket = -1; + newconn->bandwidth = -1; + } + /* learn things from parent, so we can perform auth */ memcpy(&newconn->local,&conn->local,sizeof(struct sockaddr_in)); newconn->prkey = conn->prkey; @@ -197,7 +200,7 @@ int connection_handle_listener_read(connection_t *conn, int new_type, int new_st log(LOG_DEBUG,"connection_handle_listener_read(): socket %d entered state %d.",newconn->s, new_state); newconn->state = new_state; - connection_watch_events(newconn, POLLIN); + connection_start_reading(newconn); return 0; } @@ -284,13 +287,20 @@ connection_t *connection_connect_to_router_as_op(routerinfo_t *router, RSA *prke int connection_read_to_buf(connection_t *conn) { int read_result; + if(connection_speaks_cells(conn)) { + assert(conn->receiver_bucket >= 0); + } + if(!connection_speaks_cells(conn)) { + assert(conn->receiver_bucket < 0); + } read_result = read_to_buf(conn->s, conn->receiver_bucket, &conn->inbuf, &conn->inbuflen, &conn->inbuf_datalen, &conn->inbuf_reached_eof); - log(LOG_DEBUG,"connection_read_to_buf(): read_to_buf returned %d.",read_result); - if(read_result >= 0) { +// log(LOG_DEBUG,"connection_read_to_buf(): read_to_buf returned %d.",read_result); + if(read_result >= 0 && connection_speaks_cells(conn)) { conn->receiver_bucket -= read_result; if(conn->receiver_bucket <= 0) { +// log(LOG_DEBUG,"connection_read_to_buf() stopping reading, receiver bucket full."); connection_stop_reading(conn); /* If we're not in 'open' state here, then we're never going to finish the @@ -308,6 +318,14 @@ int connection_fetch_from_buf(char *string, int len, connection_t *conn) { return fetch_from_buf(string, len, &conn->inbuf, &conn->inbuflen, &conn->inbuf_datalen); } +int connection_wants_to_flush(connection_t *conn) { + return conn->outbuf_flushlen; +} + +int connection_outbuf_too_full(connection_t *conn) { + return (conn->outbuf_flushlen > 10*CELL_PAYLOAD_SIZE); +} + int connection_flush_buf(connection_t *conn) { return flush_buf(conn->s, &conn->outbuf, &conn->outbuflen, &conn->outbuf_flushlen, &conn->outbuf_datalen); } @@ -321,7 +339,7 @@ int connection_write_to_buf(char *string, int len, connection_t *conn) { (options.LinkPadding == 0) ) { /* connection types other than or and op, or or/op not in 'open' state, should flush immediately */ /* also flush immediately if we're not doing LinkPadding, since otherwise it will never flush */ - connection_watch_events(conn, POLLOUT | POLLIN); + connection_start_writing(conn); conn->outbuf_flushlen += len; } @@ -331,6 +349,9 @@ int connection_write_to_buf(char *string, int len, connection_t *conn) { int connection_receiver_bucket_should_increase(connection_t *conn) { assert(conn); + if(!connection_speaks_cells(conn)) + return 0; /* edge connections don't use receiver_buckets */ + if(conn->receiver_bucket > 10*conn->bandwidth) return 0; @@ -350,6 +371,15 @@ void connection_increment_receiver_bucket (connection_t *conn) { } } +int connection_speaks_cells(connection_t *conn) { + assert(conn); + + if(conn->type == CONN_TYPE_OR || conn->type == CONN_TYPE_OP) + return 1; + + return 0; +} + int connection_state_is_open(connection_t *conn) { assert(conn); @@ -371,7 +401,7 @@ void connection_send_cell(connection_t *conn) { assert(conn); - if(conn->type != CONN_TYPE_OR && conn->type != CONN_TYPE_OP) { + if(!connection_speaks_cells(conn)) { /* this conn doesn't speak cells. do nothing. */ return; } @@ -385,7 +415,7 @@ void connection_send_cell(connection_t *conn) { #if 0 /* use to send evenly spaced cells, but not padding */ if(conn->outbuf_datalen - conn->outbuf_flushlen >= sizeof(cell_t)) { conn->outbuf_flushlen += sizeof(cell_t); /* instruct it to send a cell */ - connection_watch_events(conn, POLLOUT | POLLIN); + connection_start_writing(conn); } #endif @@ -408,7 +438,7 @@ void connection_send_cell(connection_t *conn) { } conn->outbuf_flushlen += sizeof(cell_t); /* instruct it to send a cell */ - connection_watch_events(conn, POLLOUT | POLLIN); + connection_start_writing(conn); } @@ -434,16 +464,12 @@ int connection_send_destroy(aci_t aci, connection_t *conn) { assert(conn); - if(conn->type == CONN_TYPE_OP || - conn->type == CONN_TYPE_AP || - conn->type == CONN_TYPE_EXIT) { + if(!connection_speaks_cells(conn)) { log(LOG_DEBUG,"connection_send_destroy(): At an edge. Marking connection for close."); conn->marked_for_close = 1; return 0; } - assert(conn->type == CONN_TYPE_OR); - cell.aci = aci; cell.command = CELL_DESTROY; log(LOG_DEBUG,"connection_send_destroy(): Sending destroy (aci %d).",aci); @@ -452,7 +478,6 @@ int connection_send_destroy(aci_t aci, connection_t *conn) { } int connection_write_cell_to_buf(cell_t *cellp, connection_t *conn) { - /* FIXME in the future, we should modify windows, etc, here */ if(connection_encrypt_cell_header(cellp,conn)<0) { return -1; @@ -464,10 +489,10 @@ int connection_write_cell_to_buf(cell_t *cellp, connection_t *conn) { int connection_encrypt_cell_header(cell_t *cellp, connection_t *conn) { char newheader[8]; int newsize; +#if 0 int x; char *px; -#if 0 printf("Sending: Cell header plaintext: "); px = (char *)cellp; for(x=0;x<8;x++) { @@ -517,7 +542,8 @@ int connection_package_raw_inbuf(connection_t *conn) { circuit_t *circ; assert(conn); - assert(conn->type == CONN_TYPE_EXIT || conn->type == CONN_TYPE_AP); + assert(!connection_speaks_cells(conn)); + /* this function should never get called if the receiver_window is 0 */ amount_to_process = conn->inbuf_datalen; @@ -548,6 +574,13 @@ int connection_package_raw_inbuf(connection_t *conn) { circuit_close(circ); return 0; } + assert(circ->n_receive_window > 0); + if(--circ->n_receive_window <= 0) { /* is it 0 after decrement? */ + connection_stop_reading(circ->n_conn); + log(LOG_DEBUG,"connection_raw_package_inbuf(): receive_window at exit reached 0."); + return 0; /* don't process the inbuf any more */ + } + log(LOG_DEBUG,"connection_raw_package_inbuf(): receive_window at exit is %d",circ->n_receive_window); } else { /* send it forward. we're an AP */ cell.aci = circ->n_aci; cell.command = CELL_DATA; @@ -557,17 +590,58 @@ int connection_package_raw_inbuf(connection_t *conn) { circuit_close(circ); return 0; } + assert(circ->p_receive_window > 0); + if(--circ->p_receive_window <= 0) { /* is it 0 after decrement? */ + connection_stop_reading(circ->p_conn); + log(LOG_DEBUG,"connection_raw_package_inbuf(): receive_window at AP reached 0."); + return 0; /* don't process the inbuf any more */ + } + log(LOG_DEBUG,"connection_raw_package_inbuf(): receive_window at AP is %d",circ->p_receive_window); } if(amount_to_process > CELL_PAYLOAD_SIZE) + log(LOG_DEBUG,"connection_raw_package_inbuf(): recursing."); return connection_package_raw_inbuf(conn); return 0; } +int connection_consider_sending_sendme(connection_t *conn) { + circuit_t *circ; + cell_t sendme; + + if(connection_outbuf_too_full(conn)) + return 0; + + circ = circuit_get_by_conn(conn); + if(!circ) { + log(LOG_DEBUG,"connection_consider_sending_sendme(): Bug: no circuit associated with conn. Closing."); + return -1; + } + sendme.command = CELL_SENDME; + sendme.length = RECEIVE_WINDOW_INCREMENT; + + if(circ->n_conn == conn) { /* we're at an exit */ + if(circ->p_receive_window < RECEIVE_WINDOW_START-RECEIVE_WINDOW_INCREMENT) { + log(LOG_DEBUG,"connection_consider_sending_sendme(): Queueing sendme back."); + circ->p_receive_window += RECEIVE_WINDOW_INCREMENT; + sendme.aci = circ->p_aci; + return connection_write_cell_to_buf(&sendme, circ->p_conn); /* (clobbers sendme) */ + } + } else { /* we're at an AP */ + if(circ->n_receive_window < RECEIVE_WINDOW_START-RECEIVE_WINDOW_INCREMENT) { + log(LOG_DEBUG,"connection_consider_sending_sendme(): Queueing sendme forward."); + circ->n_receive_window += RECEIVE_WINDOW_INCREMENT; + sendme.aci = circ->n_aci; + return connection_write_cell_to_buf(&sendme, circ->n_conn); /* (clobbers sendme) */ + } + } + return 0; +} + int connection_finished_flushing(connection_t *conn) { assert(conn); - log(LOG_DEBUG,"connection_finished_flushing() entered. Socket %u.", conn->s); +// log(LOG_DEBUG,"connection_finished_flushing() entered. Socket %u.", conn->s); switch(conn->type) { case CONN_TYPE_AP: @@ -591,7 +665,7 @@ int connection_process_cell_from_inbuf(connection_t *conn) { char crypted[128]; char outbuf[1024]; int outlen; - int x; +// int x; cell_t *cellp; if(conn->inbuf_datalen < 128) /* entire response available? */ @@ -613,7 +687,7 @@ int connection_process_cell_from_inbuf(connection_t *conn) { log(LOG_ERR,"connection_process_cell_from_inbuf(): Decryption failed, dropping."); return connection_process_inbuf(conn); /* process the remainder of the buffer */ } - log(LOG_DEBUG,"connection_process_cell_from_inbuf(): Cell decrypted (%d bytes).",outlen); +// log(LOG_DEBUG,"connection_process_cell_from_inbuf(): Cell decrypted (%d bytes).",outlen); #if 0 printf("Cell header plaintext: "); for(x=0;x<8;x++) { @@ -625,7 +699,7 @@ int connection_process_cell_from_inbuf(connection_t *conn) { /* copy the rest of the cell */ memcpy((char *)outbuf+8, (char *)crypted+8, sizeof(cell_t)-8); cellp = (cell_t *)outbuf; - log(LOG_DEBUG,"connection_process_cell_from_inbuf(): Decrypted cell is of type %u (ACI %u).",cellp->command,cellp->aci); +// log(LOG_DEBUG,"connection_process_cell_from_inbuf(): Decrypted cell is of type %u (ACI %u).",cellp->command,cellp->aci); command_process_cell(cellp, conn); return connection_process_inbuf(conn); /* process the remainder of the buffer */ diff --git a/src/or/connection_ap.c b/src/or/connection_ap.c index 95f60445dd..ce4fdcd89b 100644 --- a/src/or/connection_ap.c +++ b/src/or/connection_ap.c @@ -343,14 +343,17 @@ int connection_ap_process_data_cell(cell_t *cell, connection_t *conn) { assert(conn && conn->type == CONN_TYPE_AP); - if(conn->state == AP_CONN_STATE_OPEN) { - log(LOG_DEBUG,"connection_ap_process_data_cell(): In state 'open', writing to buf."); - return connection_write_to_buf(cell->payload, cell->length, conn); + if(conn->state != AP_CONN_STATE_OPEN) { + /* we should not have gotten this cell */ + log(LOG_DEBUG,"connection_ap_process_data_cell(): Got a data cell when not in 'open' state. Closing."); + return -1; } - /* else we shouldn't have gotten this cell */ - log(LOG_DEBUG,"connection_ap_process_data_cell(): Got a data cell when not in 'open' state. Closing."); - return -1; + log(LOG_DEBUG,"connection_ap_process_data_cell(): In state 'open', writing to buf."); + + if(connection_write_to_buf(cell->payload, cell->length, conn) < 0) + return -1; + return connection_consider_sending_sendme(conn); } int connection_ap_finished_flushing(connection_t *conn) { @@ -360,7 +363,8 @@ int connection_ap_finished_flushing(connection_t *conn) { switch(conn->state) { case AP_CONN_STATE_OPEN: /* FIXME down the road, we'll clear out circuits that are pending to close */ - connection_watch_events(conn, POLLIN); + connection_stop_writing(conn); + connection_consider_sending_sendme(conn); return 0; default: log(LOG_DEBUG,"Bug: connection_ap_finished_flushing() called in unexpected state."); @@ -377,7 +381,7 @@ int connection_ap_create_listener(RSA *prkey, struct sockaddr_in *local) { } int connection_ap_handle_listener_read(connection_t *conn) { - log(LOG_NOTICE,"AP: Received a connection request. Waiting for keys."); + log(LOG_NOTICE,"AP: Received a connection request. Waiting for SS."); return connection_handle_listener_read(conn, CONN_TYPE_AP, AP_CONN_STATE_SS_WAIT); } diff --git a/src/or/connection_exit.c b/src/or/connection_exit.c index 00e4ec14bb..c2ebca5673 100644 --- a/src/or/connection_exit.c +++ b/src/or/connection_exit.c @@ -50,12 +50,14 @@ int connection_exit_finished_flushing(connection_t *conn) { conn->address,ntohs(conn->port)); conn->state = EXIT_CONN_STATE_OPEN; - connection_flush_buf(conn); /* in case there are any queued data cells */ - connection_watch_events(conn, POLLIN); + if(connection_wants_to_flush(conn)) /* in case there are any queued data cells */ + connection_start_writing(conn); + connection_start_reading(conn); return 0; case EXIT_CONN_STATE_OPEN: /* FIXME down the road, we'll clear out circuits that are pending to close */ - connection_watch_events(conn, POLLIN); + connection_stop_writing(conn); + connection_consider_sending_sendme(conn); return 0; default: log(LOG_DEBUG,"Bug: connection_exit_finished_flushing() called in unexpected state."); @@ -101,7 +103,7 @@ int connection_exit_process_data_cell(cell_t *cell, connection_t *conn) { return -1; } memcpy(&conn->addr, rent->h_addr,rent->h_length); - log(LOG_DEBUG,"connection_exit_process_data_cell(): addr %s resolves to %d.",cell->payload,conn->addr); + log(LOG_DEBUG,"connection_exit_process_data_cell(): addr is %s.",cell->payload); } else if (!conn->port) { /* this cell contains the dest port */ if(!memchr(cell->payload,'\0',cell->length)) { log(LOG_DEBUG,"connection_exit_process_data_cell(): dest_port cell has no \\0. Closing."); @@ -139,7 +141,6 @@ int connection_exit_process_data_cell(cell_t *cell, connection_t *conn) { connection_set_poll_socket(conn); conn->state = EXIT_CONN_STATE_CONNECTING; - /* i think only pollout is needed, but i'm curious if pollin ever gets caught -RD */ log(LOG_DEBUG,"connection_exit_process_data_cell(): connect in progress, socket %d.",s); connection_watch_events(conn, POLLOUT | POLLIN); return 0; @@ -161,11 +162,12 @@ int connection_exit_process_data_cell(cell_t *cell, connection_t *conn) { return 0; case EXIT_CONN_STATE_CONNECTING: log(LOG_DEBUG,"connection_exit_process_data_cell(): Data receiving while connecting. Queueing."); - /* this sets us to POLLOUT | POLLIN, which is ok because we need to keep listening for - * writable for connect() to finish */ - return connection_write_to_buf(cell->payload, cell->length, conn); + /* we stay listening for writable, so connect() can finish */ + /* fall through to the next state -- write the cell and consider sending back a sendme */ case EXIT_CONN_STATE_OPEN: - return connection_write_to_buf(cell->payload, cell->length, conn); + if(connection_write_to_buf(cell->payload, cell->length, conn) < 0) + return -1; + return connection_consider_sending_sendme(conn); } return 0; diff --git a/src/or/connection_op.c b/src/or/connection_op.c index 1f3b6c0473..60dc8a2ba0 100644 --- a/src/or/connection_op.c +++ b/src/or/connection_op.c @@ -98,7 +98,7 @@ int connection_op_finished_flushing(connection_t *conn) { switch(conn->state) { case OP_CONN_STATE_OPEN: /* FIXME down the road, we'll clear out circuits that are pending to close */ - connection_watch_events(conn, POLLIN); + connection_stop_writing(conn); return 0; default: log(LOG_DEBUG,"Bug: connection_op_finished_flushing() called in unexpected state."); diff --git a/src/or/connection_or.c b/src/or/connection_or.c index 2542e5f4b2..cc2cc57b0c 100644 --- a/src/or/connection_or.c +++ b/src/or/connection_or.c @@ -97,7 +97,7 @@ int connection_or_finished_flushing(connection_t *conn) { return 0; case OR_CONN_STATE_OPEN: /* FIXME down the road, we'll clear out circuits that are pending to close */ - connection_watch_events(conn, POLLIN); + connection_stop_writing(conn); return 0; default: log(LOG_DEBUG,"Bug: connection_or_finished_flushing() called in unexpected state."); @@ -187,9 +187,8 @@ connection_t *connection_or_connect(routerinfo_t *router, RSA *prkey, struct soc return NULL; } - /* i think only pollout is needed, but i'm curious if pollin ever gets caught -RD */ log(LOG_DEBUG,"connection_or_connect() : connect in progress."); - connection_watch_events(conn, POLLOUT | POLLIN); + connection_watch_events(conn, POLLIN | POLLOUT); /* writable indicates finish, readable indicates broken link */ *result = 1; /* connecting */ return conn; diff --git a/src/or/main.c b/src/or/main.c index f50bff8e5d..f5940c47d3 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -198,6 +198,7 @@ void connection_stop_reading(connection_t *conn) { assert(conn && conn->poll_index < nfds); + log(LOG_DEBUG,"connection_stop_reading() called."); if(poll_array[conn->poll_index].events & POLLIN) poll_array[conn->poll_index].events -= POLLIN; } @@ -209,6 +210,22 @@ void connection_start_reading(connection_t *conn) { poll_array[conn->poll_index].events |= POLLIN; } +void connection_stop_writing(connection_t *conn) { + + assert(conn && conn->poll_index < nfds); + + if(poll_array[conn->poll_index].events & POLLOUT) + poll_array[conn->poll_index].events -= POLLOUT; +} + +void connection_start_writing(connection_t *conn) { + + assert(conn && conn->poll_index < nfds); + + poll_array[conn->poll_index].events |= POLLOUT; +} + + void check_conn_read(int i) { int retval; connection_t *conn; @@ -257,7 +274,7 @@ void check_conn_write(int i) { if(poll_array[i].revents & POLLOUT) { /* something to write */ conn = connection_array[i]; - log(LOG_DEBUG,"check_conn_write(): socket %d wants to write.",conn->s); +// log(LOG_DEBUG,"check_conn_write(): socket %d wants to write.",conn->s); if(conn->type == CONN_TYPE_OP_LISTENER || conn->type == CONN_TYPE_OR_LISTENER) { @@ -277,7 +294,7 @@ void check_conn_write(int i) { connection_free(conn); if(i<nfds) { /* we just replaced the one at i with a new one. process it too. */ - check_conn_read(i); + check_conn_write(i); } } } @@ -327,8 +344,9 @@ int prepare_for_poll(int *timeout) { if(need_to_refill_buckets) { if(now.tv_sec > current_second) { /* the second has already rolled over! */ - log(LOG_DEBUG,"prepare_for_poll(): The second has rolled over, immediately refilling."); - increment_receiver_buckets(); +// log(LOG_DEBUG,"prepare_for_poll(): The second has rolled over, immediately refilling."); + for(i=0;i<nfds;i++) + connection_increment_receiver_bucket(connection_array[i]); current_second = now.tv_sec; /* remember which second it is, for next time */ } *timeout = 1000 - (now.tv_usec / 1000); /* how many milliseconds til the next second? */ @@ -339,7 +357,7 @@ int prepare_for_poll(int *timeout) { /* now check which conn wants to speak soonest */ for(i=0;i<nfds;i++) { tmpconn = connection_array[i]; - if(tmpconn->type != CONN_TYPE_OR && tmpconn->type != CONN_TYPE_OP) + if(!connection_speaks_cells(tmpconn)) continue; /* this conn type doesn't send cells */ if(!connection_state_is_open(tmpconn)) continue; /* only conns in state 'open' have a valid send_timeval */ @@ -372,13 +390,6 @@ int prepare_for_poll(int *timeout) { return 0; } -void increment_receiver_buckets(void) { - int i; - - for(i=0;i<nfds;i++) - connection_increment_receiver_bucket(connection_array[i]); -} - int do_main_loop(void) { int i; int timeout; diff --git a/src/or/or.h b/src/or/or.h index 1db2e5462e..a72f0d88b8 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -112,6 +112,9 @@ /* default cipher function */ #define ONION_DEFAULT_CIPHER ONION_CIPHER_DES +#define RECEIVE_WINDOW_START 100 +#define RECEIVE_WINDOW_INCREMENT 10 + typedef uint16_t aci_t; typedef struct @@ -240,6 +243,8 @@ typedef struct uint16_t n_port; connection_t *p_conn; connection_t *n_conn; + int n_receive_window; + int p_receive_window; aci_t p_aci; /* connection identifiers */ aci_t n_aci; @@ -370,6 +375,7 @@ void circuit_about_to_close_connection(connection_t *conn); void command_process_cell(cell_t *cell, connection_t *conn); void command_process_create_cell(cell_t *cell, connection_t *conn); +void command_process_sendme_cell(cell_t *cell, connection_t *conn); void command_process_data_cell(cell_t *cell, connection_t *conn); void command_process_destroy_cell(cell_t *cell, connection_t *conn); @@ -402,6 +408,8 @@ int connection_read_to_buf(connection_t *conn); int connection_fetch_from_buf(char *string, int len, connection_t *conn); +int connection_outbuf_too_full(connection_t *conn); +int connection_wants_to_flush(connection_t *conn); int connection_flush_buf(connection_t *conn); int connection_write_to_buf(char *string, int len, connection_t *conn); @@ -413,6 +421,7 @@ void connection_increment_receiver_bucket (connection_t *conn); void connection_increment_send_timeval(connection_t *conn); void connection_init_timeval(connection_t *conn); +int connection_speaks_cells(connection_t *conn); int connection_state_is_open(connection_t *conn); int connection_send_destroy(aci_t aci, connection_t *conn); @@ -423,6 +432,7 @@ int connection_process_inbuf(connection_t *conn); int connection_package_raw_inbuf(connection_t *conn); int connection_process_cell_from_inbuf(connection_t *conn); +int connection_consider_sending_sendme(connection_t *conn); int connection_finished_flushing(connection_t *conn); /********************************* connection_ap.c ****************************/ @@ -513,13 +523,14 @@ connection_t *connect_to_router_as_op(routerinfo_t *router); void connection_watch_events(connection_t *conn, short events); void connection_stop_reading(connection_t *conn); void connection_start_reading(connection_t *conn); +void connection_stop_writing(connection_t *conn); +void connection_start_writing(connection_t *conn); void check_conn_read(int i); void check_conn_marked(int i); void check_conn_write(int i); int prepare_for_poll(int *timeout); -void increment_receiver_buckets(void); int do_main_loop(void); |