aboutsummaryrefslogtreecommitdiff
path: root/src/or/connection_ap.c
diff options
context:
space:
mode:
authorRoger Dingledine <arma@torproject.org>2003-01-26 09:02:24 +0000
committerRoger Dingledine <arma@torproject.org>2003-01-26 09:02:24 +0000
commitc35373a2cfae66d41e4120469096acde10cea050 (patch)
treeff8e17e0d6ebcc365ff01e75a0be8c1120604be2 /src/or/connection_ap.c
parentbf52b6d1f4b85c2267a0ef68dcc6ad8438f2358b (diff)
downloadtor-c35373a2cfae66d41e4120469096acde10cea050.tar
tor-c35373a2cfae66d41e4120469096acde10cea050.tar.gz
major overhaul: dns slave subsystem, topics
on startup, it forks off a master dns handler, which forks off dns slaves (like the apache model). slaves as spawned as load increases, and then reused. excess slaves are not ever killed, currently. implemented topics. each topic has a receive window in each direction at each edge of the circuit, and sends sendme's at the data level, as per before. each circuit also has receive windows in each direction at each hop; an edge sends a circuit-level sendme as soon as enough data cells have arrived (regardless of whether the data cells were flushed to the exit conns). removed the 'connected' cell type, since it's now a topic command within data cells. at the edge of the circuit, there can be multiple connections associated with a single circuit. you find them via the linked list conn->next_topic. currently each new ap connection starts its own circuit, so we ought to see comparable performance to what we had before. but that's only because i haven't written the code to reattach to old circuits. please try to break it as-is, and then i'll make it reuse the same circuit and we'll try to break that. svn:r152
Diffstat (limited to 'src/or/connection_ap.c')
-rw-r--r--src/or/connection_ap.c203
1 files changed, 143 insertions, 60 deletions
diff --git a/src/or/connection_ap.c b/src/or/connection_ap.c
index adfceda90..978a9dd02 100644
--- a/src/or/connection_ap.c
+++ b/src/or/connection_ap.c
@@ -22,7 +22,10 @@ int connection_ap_process_inbuf(connection_t *conn) {
case AP_CONN_STATE_SOCKS_WAIT:
return ap_handshake_process_socks(conn);
case AP_CONN_STATE_OPEN:
- return connection_package_raw_inbuf(conn);
+ if(connection_package_raw_inbuf(conn) < 0)
+ return -1;
+ circuit_consider_stop_edge_reading(circuit_get_by_conn(conn), EDGE_AP);
+ return 0;
default:
log(LOG_DEBUG,"connection_ap_process_inbuf() called in state where I'm waiting. Ignoring buf for now.");
}
@@ -33,6 +36,7 @@ int connection_ap_process_inbuf(connection_t *conn) {
int ap_handshake_process_socks(connection_t *conn) {
char c;
socks4_t socks4_info;
+ circuit_t *circ;
assert(conn);
@@ -118,8 +122,25 @@ int ap_handshake_process_socks(connection_t *conn) {
}
}
- /* now we're all ready to make an onion, etc */
- return ap_handshake_create_onion(conn);
+ /* find the circuit that we should use, if there is one. */
+ circ = NULL; /* FIXME don't reuse circs, at least for me. */
+
+ /* now we're all ready to make an onion or send a begin */
+
+ if(circ) {
+ if(circ->state == CIRCUIT_STATE_OPEN) {
+ if(ap_handshake_send_begin(conn, circ) < 0) {
+ circuit_close(circ);
+ return -1;
+ }
+ }
+ } else {
+ if(ap_handshake_create_onion(conn) < 0) {
+ circuit_close(circ);
+ return -1;
+ }
+ }
+ return 0;
}
int ap_handshake_create_onion(connection_t *conn) {
@@ -240,13 +261,18 @@ void ap_handshake_n_conn_open(connection_t *or_conn) {
}
connection_start_reading(p_conn); /* resume listening for reads */
log(LOG_DEBUG,"ap_handshake_n_conn_open(): Found circ, sending onion.");
- if(ap_handshake_send_onion(p_conn, or_conn, circ)<0) {
+ if(ap_handshake_send_onion(p_conn, or_conn, circ) < 0) {
log(LOG_DEBUG,"ap_handshake_n_conn_open(): circuit marked for closing.");
- p_conn->marked_for_close = 1;
- return; /* XXX will want to try the rest too */
- } else {
- circ = circuit_enumerate_by_naddr_nport(circ, or_conn->addr, or_conn->port);
+ circuit_close(circ);
+ return; /* FIXME will want to try the other circuits too? */
}
+ for(p_conn = p_conn->next_topic; p_conn; p_conn = p_conn->next_topic) { /* start up any other pending topics */
+ if(ap_handshake_send_begin(p_conn, circ) < 0) {
+ circuit_close(circ);
+ return;
+ }
+ }
+ circ = circuit_enumerate_by_naddr_nport(circ, or_conn->addr, or_conn->port);
}
}
@@ -289,50 +315,39 @@ int ap_handshake_send_onion(connection_t *ap_conn, connection_t *n_conn, circuit
}
free(tmpbuf);
-#if 0
- /* deliver the ss in a data cell */
- cell.command = CELL_DATA;
- cell.aci = circ->n_aci;
- cell.length = sizeof(ss_t);
- memcpy(cell.payload, &ap_conn->ss, sizeof(ss_t));
- log(LOG_DEBUG,"ap_handshake_send_onion(): Sending a data cell for ss...");
- if(circuit_deliver_data_cell(&cell, circ, circ->n_conn, 'e') < 0) {
- log(LOG_DEBUG,"ap_handshake_send_onion(): failed to deliver ss cell. Closing.");
- circuit_close(circ);
+ if(ap_handshake_send_begin(ap_conn, circ) < 0) {
return -1;
}
-#endif
- /* deliver the dest_addr in a data cell */
- cell.command = CELL_DATA;
- cell.aci = circ->n_aci;
- strncpy(cell.payload, ap_conn->dest_addr, CELL_PAYLOAD_SIZE);
- cell.length = strlen(cell.payload)+1;
- log(LOG_DEBUG,"ap_handshake_send_onion(): Sending a data cell for addr...");
- if(circuit_deliver_data_cell(&cell, circ, circ->n_conn, 'e') < 0) {
- log(LOG_DEBUG,"ap_handshake_send_onion(): failed to deliver addr cell. Closing.");
- circuit_close(circ);
- return -1;
- }
+ circ->state = CIRCUIT_STATE_OPEN;
+ /* FIXME should set circ->expire to something here */
+
+ return 0;
+}
+
+int ap_handshake_send_begin(connection_t *ap_conn, circuit_t *circ) {
+ cell_t cell;
- /* deliver the dest_port in a data cell */
+ memset(&cell, 0, sizeof(cell_t));
+ /* deliver the dest_addr in a data cell */
cell.command = CELL_DATA;
cell.aci = circ->n_aci;
- snprintf(cell.payload, CELL_PAYLOAD_SIZE, "%d", ap_conn->dest_port);
- cell.length = strlen(cell.payload)+1;
- log(LOG_DEBUG,"ap_handshake_send_onion(): Sending a data cell for port...");
- if(circuit_deliver_data_cell(&cell, circ, circ->n_conn, 'e') < 0) {
- log(LOG_DEBUG,"ap_handshake_send_onion(): failed to deliver port cell. Closing.");
- circuit_close(circ);
+ crypto_pseudo_rand(3, cell.payload+1); /* byte 0 is blank, bytes 1-3 are random */
+ /* FIXME check for collisions */
+ cell.payload[0] = 0;
+ ap_conn->topic_id = *(uint32_t *)cell.payload;
+ cell.payload[0] = TOPIC_COMMAND_BEGIN;
+ snprintf(cell.payload+4, CELL_PAYLOAD_SIZE-4, "%s,%d", ap_conn->dest_addr, ap_conn->dest_port);
+ cell.length = strlen(cell.payload+TOPIC_HEADER_SIZE)+1+TOPIC_HEADER_SIZE;
+ log(LOG_DEBUG,"ap_handshake_send_begin(): Sending data cell to begin topic %d.", ap_conn->topic_id);
+ if(circuit_deliver_data_cell_from_edge(&cell, circ, EDGE_AP) < 0) {
+ log(LOG_DEBUG,"ap_handshake_send_begin(): failed to deliver begin cell. Closing.");
return -1;
}
-
- circ->state = CIRCUIT_STATE_OPEN;
+ ap_conn->n_receive_topicwindow = TOPICWINDOW_START;
+ ap_conn->p_receive_topicwindow = TOPICWINDOW_START;
ap_conn->state = AP_CONN_STATE_OPEN;
- log(LOG_INFO,"ap_handshake_send_onion(): Address/port sent, ap socket %d, n_aci %d",ap_conn->s,circ->n_aci);
-
- /* FIXME should set circ->expire to something here */
-
+ log(LOG_INFO,"ap_handshake_send_begin(): Address/port sent, ap socket %d, n_aci %d",ap_conn->s,circ->n_aci);
return 0;
}
@@ -351,30 +366,99 @@ int ap_handshake_socks_reply(connection_t *conn, char result) {
return connection_flush_buf(conn); /* try to flush it, in case we're about to close the conn */
}
-int connection_ap_send_connected(connection_t *conn) {
- assert(conn);
+int connection_ap_process_data_cell(cell_t *cell, circuit_t *circ) {
+ connection_t *conn;
+ int topic_command;
+ int topic_id;
- return ap_handshake_socks_reply(conn, SOCKS4_REQUEST_GRANTED);
-}
+ /* an incoming data cell has arrived */
-int connection_ap_process_data_cell(cell_t *cell, connection_t *conn) {
+ assert(cell && circ);
- /* an incoming data cell has arrived */
+ topic_command = *cell->payload;
+ *cell->payload = 0;
+ topic_id = *(uint32_t *)cell->payload;
+ log(LOG_DEBUG,"connection_ap_process_data_cell(): command %d topic %d", topic_command, topic_id);
- assert(conn && conn->type == CONN_TYPE_AP);
+ circuit_consider_sending_sendme(circ, EDGE_AP);
+
+ for(conn = circ->p_conn; conn && conn->topic_id != topic_id; conn = conn->next_topic) ;
+
+ /* now conn is either NULL, in which case we don't recognize the topic_id, or
+ * it is set, in which case cell is talking about this conn.
+ */
- if(conn->state != AP_CONN_STATE_OPEN) {
+ if(conn && conn->state != AP_CONN_STATE_OPEN) {
/* we should not have gotten this cell */
- log(LOG_DEBUG,"connection_ap_process_data_cell(): Got a data cell when not in 'open' state. Closing.");
- return -1;
+ log(LOG_DEBUG,"connection_ap_process_data_cell(): Got a data cell when not in 'open' state. Dropping.");
+ return 0;
}
-// log(LOG_DEBUG,"connection_ap_process_data_cell(): In state 'open', writing to buf.");
+ switch(topic_command) {
+ case TOPIC_COMMAND_BEGIN:
+ log(LOG_INFO,"connection_ap_process_data_cell(): topic begin request unsupported. Dropping.");
+ break;
+ case TOPIC_COMMAND_DATA:
+ if(!conn) {
+ log(LOG_DEBUG,"connection_ap_process_data_cell(): data cell dropped, unknown topic %d.",topic_id);
+ return 0;
+ }
+ if(--conn->n_receive_topicwindow < 0) { /* is it below 0 after decrement? */
+ log(LOG_DEBUG,"connection_ap_process_data_cell(): receive_topicwindow at exit below 0. Killing.");
+ return -1; /* exit node breaking protocol. kill the whole circuit. */
+ }
+ log(LOG_DEBUG,"connection_ap_process_data_cell(): willing to receive %d more cells from circ",conn->n_receive_topicwindow);
- if(connection_write_to_buf(cell->payload, cell->length, conn) < 0)
- return -1;
- return connection_consider_sending_sendme(conn);
-}
+ if(connection_write_to_buf(cell->payload + TOPIC_HEADER_SIZE,
+ cell->length - TOPIC_HEADER_SIZE, conn) < 0) {
+ conn->marked_for_close = 1;
+ return 0;
+ }
+ if(connection_consider_sending_sendme(conn, EDGE_AP) < 0)
+ conn->marked_for_close = 1;
+ return 0;
+ case TOPIC_COMMAND_END:
+ if(!conn) {
+ log(LOG_DEBUG,"connection_ap_process_data_cell(): end cell dropped, unknown topic %d.",topic_id);
+ return 0;
+ }
+ log(LOG_DEBUG,"connection_ap_process_data_cell(): end cell for topic %d. Removing topic.",topic_id);
+
+ /* go through and identify who points to conn. remove conn from the list. */
+#if 0
+ if(conn == circ->p_conn) {
+ circ->p_conn = conn->next_topic;
+ }
+ for(prevconn = circ->p_conn; prevconn->next_topic != conn; prevconn = prevconn->next_topic) ;
+ prevconn->next_topic = conn->next_topic;
+#endif
+ conn->marked_for_close = 1;
+ break;
+ case TOPIC_COMMAND_CONNECTED:
+ if(!conn) {
+ log(LOG_DEBUG,"connection_ap_process_data_cell(): connected cell dropped, unknown topic %d.",topic_id);
+ break;
+ }
+ log(LOG_DEBUG,"connection_ap_process_data_cell(): Connected! Notifying application.");
+ if(ap_handshake_socks_reply(conn, SOCKS4_REQUEST_GRANTED) < 0) {
+ conn->marked_for_close = 1;
+ }
+ break;
+ case TOPIC_COMMAND_SENDME:
+ if(!conn) {
+ log(LOG_DEBUG,"connection_exit_process_data_cell(): sendme cell dropped, unknown topic %d.",topic_id);
+ return 0;
+ }
+ conn->p_receive_topicwindow += TOPICWINDOW_INCREMENT;
+ connection_start_reading(conn);
+ connection_package_raw_inbuf(conn); /* handle whatever might still be on the inbuf */
+ circuit_consider_stop_edge_reading(circ, EDGE_AP);
+ break;
+ default:
+ log(LOG_DEBUG,"connection_ap_process_data_cell(): unknown topic command %d.",topic_command);
+ }
+ return 0;
+}
int connection_ap_finished_flushing(connection_t *conn) {
@@ -384,14 +468,13 @@ int connection_ap_finished_flushing(connection_t *conn) {
case AP_CONN_STATE_OPEN:
/* FIXME down the road, we'll clear out circuits that are pending to close */
connection_stop_writing(conn);
- return connection_consider_sending_sendme(conn);
+ return connection_consider_sending_sendme(conn, EDGE_AP);
default:
log(LOG_DEBUG,"Bug: connection_ap_finished_flushing() called in unexpected state.");
return 0;
}
return 0;
-
}
int connection_ap_create_listener(struct sockaddr_in *bindaddr) {