aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2009-08-11 15:16:16 -0400
committerNick Mathewson <nickm@torproject.org>2010-09-27 12:31:13 -0400
commit4af6887d201d978a46072ead0036e0d16fa5908a (patch)
tree6d4c6d53f20ac02ef713fd01a211a57ff26c6510
parentb63f6518cbdc4c80b09399bc17d3bec3cac76ad9 (diff)
downloadtor-4af6887d201d978a46072ead0036e0d16fa5908a.tar
tor-4af6887d201d978a46072ead0036e0d16fa5908a.tar.gz
Add support for linked connections with bufferevent_pair.
Also, set directory connections (linked and otherwise) to use bufferevents. Also, stop using outbuf_flushlen anywhere except for OR connections.
-rw-r--r--src/or/connection.c2
-rw-r--r--src/or/connection.h4
-rw-r--r--src/or/connection_edge.c22
-rw-r--r--src/or/connection_edge.h3
-rw-r--r--src/or/directory.c11
-rw-r--r--src/or/main.c90
6 files changed, 102 insertions, 30 deletions
diff --git a/src/or/connection.c b/src/or/connection.c
index 394aa3b83..8b9d47d5d 100644
--- a/src/or/connection.c
+++ b/src/or/connection.c
@@ -191,8 +191,8 @@ connection_type_uses_bufferevent(connection_t *conn)
{
switch (conn->type) {
case CONN_TYPE_AP:
- return 1;
case CONN_TYPE_EXIT:
+ case CONN_TYPE_DIR:
return 1;
default:
return 0;
diff --git a/src/or/connection.h b/src/or/connection.h
index 4d269d649..adf79f139 100644
--- a/src/or/connection.h
+++ b/src/or/connection.h
@@ -100,7 +100,7 @@ connection_get_inbuf_len(connection_t *conn)
IF_HAS_BUFFEREVENT(conn, {
return evbuffer_get_length(bufferevent_get_input(conn->bufev));
}) ELSE_IF_NO_BUFFEREVENT {
- return buf_datalen(conn->inbuf);
+ return conn->inbuf ? buf_datalen(conn->inbuf) : 0;
}
}
@@ -110,7 +110,7 @@ connection_get_outbuf_len(connection_t *conn)
IF_HAS_BUFFEREVENT(conn, {
return evbuffer_get_length(bufferevent_get_output(conn->bufev));
}) ELSE_IF_NO_BUFFEREVENT {
- return buf_datalen(conn->outbuf);
+ return conn->outbuf ? buf_datalen(conn->outbuf) : 0;
}
}
diff --git a/src/or/connection_edge.c b/src/or/connection_edge.c
index f90c44f58..39bc8e7c0 100644
--- a/src/or/connection_edge.c
+++ b/src/or/connection_edge.c
@@ -357,8 +357,9 @@ connection_edge_finished_connecting(edge_connection_t *edge_conn)
rep_hist_note_exit_stream_opened(conn->port);
conn->state = EXIT_CONN_STATE_OPEN;
- connection_watch_events(conn, READ_EVENT); /* stop writing, keep reading */
- if (connection_wants_to_flush(conn)) /* in case there are any queued relay
+ IF_HAS_NO_BUFFEREVENT(conn)
+ connection_watch_events(conn, READ_EVENT); /* stop writing, keep reading */
+ if (connection_get_outbuf_len(conn)) /* in case there are any queued relay
* cells */
connection_start_writing(conn);
/* deliver a 'connected' relay cell back through the circuit. */
@@ -2109,8 +2110,10 @@ connection_ap_handshake_send_begin(edge_connection_t *ap_conn)
ap_conn->socks_request->port);
payload_len = (int)strlen(payload)+1;
- log_debug(LD_APP,
- "Sending relay cell to begin stream %d.", ap_conn->stream_id);
+ log_info(LD_APP,
+ "Sending relay cell %d to begin stream %d.",
+ (int)ap_conn->use_begindir,
+ ap_conn->stream_id);
begin_type = ap_conn->use_begindir ?
RELAY_COMMAND_BEGIN_DIR : RELAY_COMMAND_BEGIN;
@@ -2218,9 +2221,11 @@ connection_ap_handshake_send_resolve(edge_connection_t *ap_conn)
* and call connection_ap_handshake_attach_circuit(conn) on it.
*
* Return the other end of the linked connection pair, or -1 if error.
+ * DOCDOC partner.
*/
edge_connection_t *
-connection_ap_make_link(char *address, uint16_t port,
+connection_ap_make_link(connection_t *partner,
+ char *address, uint16_t port,
const char *digest, int use_begindir, int want_onehop)
{
edge_connection_t *conn;
@@ -2255,6 +2260,8 @@ connection_ap_make_link(char *address, uint16_t port,
tor_addr_make_unspec(&conn->_base.addr);
conn->_base.port = 0;
+ connection_link_connections(partner, TO_CONN(conn));
+
if (connection_add(TO_CONN(conn)) < 0) { /* no space, forget it */
connection_free(TO_CONN(conn));
return NULL;
@@ -2772,12 +2779,13 @@ connection_exit_connect(edge_connection_t *edge_conn)
}
conn->state = EXIT_CONN_STATE_OPEN;
- if (connection_wants_to_flush(conn)) {
+ if (connection_get_outbuf_len(conn)) {
/* in case there are any queued data cells */
log_warn(LD_BUG,"newly connected conn had data waiting!");
// connection_start_writing(conn);
}
- connection_watch_events(conn, READ_EVENT);
+ IF_HAS_NO_BUFFEREVENT(conn)
+ connection_watch_events(conn, READ_EVENT);
/* also, deliver a 'connected' cell back through the circuit. */
if (connection_edge_is_rendezvous_stream(edge_conn)) {
diff --git a/src/or/connection_edge.h b/src/or/connection_edge.h
index 762af5172..0f7bf0780 100644
--- a/src/or/connection_edge.h
+++ b/src/or/connection_edge.h
@@ -29,7 +29,8 @@ int connection_edge_finished_connecting(edge_connection_t *conn);
int connection_ap_handshake_send_begin(edge_connection_t *ap_conn);
int connection_ap_handshake_send_resolve(edge_connection_t *ap_conn);
-edge_connection_t *connection_ap_make_link(char *address, uint16_t port,
+edge_connection_t *connection_ap_make_link(connection_t *partner,
+ char *address, uint16_t port,
const char *digest,
int use_begindir, int want_onehop);
void connection_ap_handshake_socks_reply(edge_connection_t *conn, char *reply,
diff --git a/src/or/directory.c b/src/or/directory.c
index 284a1ad12..ac6f205fb 100644
--- a/src/or/directory.c
+++ b/src/or/directory.c
@@ -892,14 +892,14 @@ directory_initiate_command_rend(const char *address, const tor_addr_t *_addr,
* hook up both sides
*/
linked_conn =
- connection_ap_make_link(conn->_base.address, conn->_base.port,
+ connection_ap_make_link(TO_CONN(conn),
+ conn->_base.address, conn->_base.port,
digest, use_begindir, conn->dirconn_direct);
if (!linked_conn) {
log_warn(LD_NET,"Making tunnel to dirserver failed.");
connection_mark_for_close(TO_CONN(conn));
return;
}
- connection_link_connections(TO_CONN(conn), TO_CONN(linked_conn));
if (connection_add(TO_CONN(conn)) < 0) {
log_warn(LD_NET,"Unable to add connection for link to dirserver.");
@@ -912,8 +912,12 @@ directory_initiate_command_rend(const char *address, const tor_addr_t *_addr,
payload, payload_len,
supports_conditional_consensus,
if_modified_since);
+
connection_watch_events(TO_CONN(conn), READ_EVENT|WRITE_EVENT);
- connection_start_reading(TO_CONN(linked_conn));
+ IF_HAS_BUFFEREVENT(TO_CONN(linked_conn), {
+ connection_watch_events(TO_CONN(linked_conn), READ_EVENT|WRITE_EVENT);
+ }) ELSE_IF_NO_BUFFEREVENT
+ connection_start_reading(TO_CONN(linked_conn));
}
}
@@ -3352,6 +3356,7 @@ connection_dir_finished_flushing(dir_connection_t *conn)
DIRREQ_DIRECT,
DIRREQ_FLUSHING_DIR_CONN_FINISHED);
switch (conn->_base.state) {
+ case DIR_CONN_STATE_CONNECTING:
case DIR_CONN_STATE_CLIENT_SENDING:
log_debug(LD_DIR,"client finished sending command.");
conn->_base.state = DIR_CONN_STATE_CLIENT_READING;
diff --git a/src/or/main.c b/src/or/main.c
index 976d805e1..cba98a884 100644
--- a/src/or/main.c
+++ b/src/or/main.c
@@ -155,6 +155,32 @@ int can_complete_circuit=0;
*
****************************************************************************/
+#ifdef USE_BUFFEREVENTS
+static void
+free_old_inbuf(connection_t *conn)
+{
+ if (! conn->inbuf)
+ return;
+
+ tor_assert(conn->outbuf);
+ tor_assert(buf_datalen(conn->inbuf) == 0);
+ tor_assert(buf_datalen(conn->outbuf) == 0);
+ buf_free(conn->inbuf);
+ buf_free(conn->outbuf);
+ conn->inbuf = conn->outbuf = NULL;
+
+ if (conn->read_event) {
+ event_del(conn->read_event);
+ tor_event_free(conn->read_event);
+ }
+ if (conn->write_event) {
+ event_del(conn->read_event);
+ tor_event_free(conn->write_event);
+ }
+ conn->read_event = conn->write_event = NULL;
+}
+#endif
+
/** Add <b>conn</b> to the array of connections that we can poll on. The
* connection's socket must be set; the connection starts out
* non-reading and non-writing.
@@ -173,28 +199,47 @@ connection_add_impl(connection_t *conn, int is_connecting)
smartlist_add(connection_array, conn);
#ifdef USE_BUFFEREVENTS
- if (connection_type_uses_bufferevent(conn) &&
- conn->s >= 0 && !conn->linked) {
- conn->bufev = bufferevent_socket_new(
+ if (connection_type_uses_bufferevent(conn)) {
+ if (conn->s >= 0 && !conn->linked) {
+ conn->bufev = bufferevent_socket_new(
tor_libevent_get_base(),
conn->s,
BEV_OPT_DEFER_CALLBACKS);
- if (conn->inbuf) {
+ /* XXXX CHECK FOR NULL RETURN! */
+ if (is_connecting) {
+ /* Put the bufferevent into a "connecting" state so that we'll get
+ * a "connected" event callback on successful write. */
+ bufferevent_socket_connect(conn->bufev, NULL, 0);
+ }
+ connection_configure_bufferevent_callbacks(conn);
+ } else if (conn->linked && conn->linked_conn &&
+ connection_type_uses_bufferevent(conn->linked_conn)) {
+ tor_assert(conn->s < 0);
+ if (!conn->bufev) {
+ struct bufferevent *pair[2] = { NULL, NULL };
+ /* XXXX CHECK FOR ERROR RETURN! */
+ bufferevent_pair_new(tor_libevent_get_base(),
+ BEV_OPT_DEFER_CALLBACKS,
+ pair);
+ tor_assert(pair[0]);
+ conn->bufev = pair[0];
+ conn->linked_conn->bufev = pair[1];
+ } /* else the other side already was added, and got a bufferevent_pair */
+ connection_configure_bufferevent_callbacks(conn);
+ }
+
+ if (conn->bufev && conn->inbuf) {
/* XXX Instead we should assert that there is no inbuf, once we
* have linked connections using bufferevents. */
- tor_assert(conn->outbuf);
- tor_assert(buf_datalen(conn->inbuf) == 0);
- tor_assert(buf_datalen(conn->outbuf) == 0);
- buf_free(conn->inbuf);
- buf_free(conn->outbuf);
- conn->inbuf = conn->outbuf = NULL;
+ free_old_inbuf(conn);
}
- if (is_connecting) {
- /* Put the bufferevent into a "connecting" state so that we'll get
- * a "connected" event callback on successful write. */
- bufferevent_socket_connect(conn->bufev, NULL, 0);
+
+ if (conn->linked_conn && conn->linked_conn->bufev &&
+ conn->linked_conn->inbuf) {
+ /* XXX Instead we should assert that there is no inbuf, once we
+ * have linked connections using bufferevents. */
+ free_old_inbuf(conn->linked_conn);
}
- connection_configure_bufferevent_callbacks(conn);
}
#else
(void) is_connecting;
@@ -205,6 +250,7 @@ connection_add_impl(connection_t *conn, int is_connecting)
conn->s, EV_READ|EV_PERSIST, conn_read_callback, conn);
conn->write_event = tor_event_new(tor_libevent_get_base(),
conn->s, EV_WRITE|EV_PERSIST, conn_write_callback, conn);
+ /* XXXX CHECK FOR NULL RETURN! */
}
log_debug(LD_NET,"new conn type %s, socket %d, address %s, n_conns %d.",
@@ -671,11 +717,19 @@ conn_close_if_marked(int i)
/* assert_all_pending_dns_resolves_ok(); */
#ifdef USE_BUFFEREVENTS
- if (conn->bufev && conn->hold_open_until_flushed)
+ if (conn->bufev && conn->hold_open_until_flushed) {
+ if (conn->linked) {
+ /* We need to do this explicitly so that the linked connection
+ * notices that there was an EOF. */
+ bufferevent_flush(conn->bufev, EV_WRITE, BEV_FINISHED);
+ /* XXXX Now can we free it? */
+ }
return 0;
+ }
#endif
log_debug(LD_NET,"Cleaning up connection (fd %d).",conn->s);
+ IF_HAS_BUFFEREVENT(conn, goto unlink);
if ((conn->s >= 0 || conn->linked_conn) && connection_wants_to_flush(conn)) {
/* s == -1 means it's an incomplete edge connection, or that the socket
* has already been closed as unflushable. */
@@ -743,6 +797,10 @@ conn_close_if_marked(int i)
conn->marked_for_close);
}
}
+
+#ifdef USE_BUFFEREVENTS
+ unlink:
+#endif
connection_unlink(conn); /* unlink, remove, free */
return 1;
}