From c6e22ae2b747681b86b6c00c1fe8438f2b4a8e0f Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 4 Aug 2009 09:29:30 -0700 Subject: Basic bufferevent callbacks These are based strongly on connection_handle_read and connection_handle_write, but hopefully without so much mixture of IO logic and Tor logic. --- src/or/connection.c | 125 +++++++++++++++++++++++++++++++++++++++++++++++++++- src/or/connection.h | 6 ++- src/or/or.h | 1 + 3 files changed, 129 insertions(+), 3 deletions(-) diff --git a/src/or/connection.c b/src/or/connection.c index db51bbe53..a68ba7748 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -2605,6 +2605,128 @@ connection_read_to_buf(connection_t *conn, int *max_to_read, int *socket_error) return 0; } +#ifdef USE_BUFFEREVENTS +/* XXXX These generic versions could be simplified by making them + type-specific */ +static void +evbuffer_inbuf_callback(struct evbuffer *buf, + const struct evbuffer_cb_info *info, void *arg) +{ + connection_t *conn = arg; + (void) buf; + /* XXXX These need to get real counts on the non-nested TLS case. - NM */ + if (info->n_added) { + time_t now = approx_time(); + conn->timestamp_lastread = now; + connection_buckets_decrement(conn, now, info->n_added, 0); + connection_consider_empty_read_buckets(conn); + if (conn->type == CONN_TYPE_AP) { + edge_connection_t *edge_conn = TO_EDGE_CONN(conn); + /*XXXX021 check for overflow*/ + edge_conn->n_read += (int)info->n_added; + } + } +} + +static void +evbuffer_outbuf_callback(struct evbuffer *buf, + const struct evbuffer_cb_info *info, void *arg) +{ + connection_t *conn = arg; + (void)buf; + if (info->n_deleted) { + time_t now = approx_time(); + conn->timestamp_lastwritten = now; + connection_buckets_decrement(conn, now, 0, info->n_deleted); + connection_consider_empty_write_buckets(conn); + if (conn->type == CONN_TYPE_AP) { + edge_connection_t *edge_conn = TO_EDGE_CONN(conn); + /*XXXX021 check for overflow*/ + edge_conn->n_written += (int)info->n_deleted; + } + } +} + +static void +connection_handle_read_cb(struct bufferevent *bufev, void *arg) +{ + connection_t *conn = arg; + (void) bufev; + if (!conn->marked_for_close) + if (connection_process_inbuf(conn, 1)<0) /* XXXX Always 1? */ + connection_mark_for_close(conn); +} + +static void +connection_handle_write_cb(struct bufferevent *bufev, void *arg) +{ + connection_t *conn = arg; + (void) bufev; + if (connection_flushed_some(conn)<0) { + connection_mark_for_close(conn); + return; + } + + if (!connection_wants_to_flush(conn)) { + connection_finished_flushing(conn); + } +} + +static void +connection_handle_event_cb(struct bufferevent *bufev, short event, void *arg) +{ + connection_t *conn = arg; + (void) bufev; + if (event & BEV_EVENT_CONNECTED) { + tor_assert(connection_state_is_connecting(conn)); + if (connection_finished_connecting(conn)<0) + return; + } + if (event & BEV_EVENT_EOF) { + if (!conn->marked_for_close) { + conn->inbuf_reached_eof = 1; + if (connection_reached_eof(conn)<0) + return; + } + } + if (event & BEV_EVENT_ERROR) { + int socket_error = evutil_socket_geterror(conn->s); + if (conn->type == CONN_TYPE_OR && + conn->state == OR_CONN_STATE_CONNECTING) { + connection_or_connect_failed(TO_OR_CONN(conn), + errno_to_orconn_end_reason(socket_error), + tor_socket_strerror(socket_error)); + } else if (CONN_IS_EDGE(conn)) { + edge_connection_t *edge_conn = TO_EDGE_CONN(conn); + connection_edge_end_errno(edge_conn); + if (edge_conn->socks_request) /* broken, don't send a socks reply back */ + edge_conn->socks_request->has_finished = 1; + } + connection_close_immediate(conn); /* Connection is dead. */ + connection_mark_for_close(conn); + } +} + +void +connection_configure_bufferevent_callbacks(connection_t *conn) +{ + struct bufferevent *bufev; + struct evbuffer *input, *output; + tor_assert(conn->bufev); + bufev = conn->bufev; + bufferevent_setcb(bufev, + connection_handle_read_cb, + connection_handle_write_cb, + connection_handle_event_cb, + conn); + + input = bufferevent_get_input(bufev); + output = bufferevent_get_output(bufev); + evbuffer_add_cb(input, evbuffer_inbuf_callback, conn); + evbuffer_add_cb(output, evbuffer_outbuf_callback, conn); +} +#endif + /** A pass-through to fetch_from_buf. */ int connection_fetch_from_buf(char *string, size_t len, connection_t *conn) @@ -2760,6 +2882,7 @@ connection_handle_write_impl(connection_t *conn, int force) /* If we just flushed the last bytes, check if this tunneled dir * request is done. */ + /* XXXX move this to flushed_some or finished_flushing -NM */ if (buf_datalen(conn->outbuf) == 0 && conn->dirreq_id) geoip_change_dirreq_state(conn->dirreq_id, DIRREQ_TUNNELED, DIRREQ_OR_CONN_BUFFER_FLUSHED); @@ -3433,7 +3556,7 @@ assert_connection_ok(connection_t *conn, time_t now) tor_assert(conn->type >= _CONN_TYPE_MIN); tor_assert(conn->type <= _CONN_TYPE_MAX); -#ifdef USE_BUFFEREVENT +#ifdef USE_BUFFEREVENTS if (conn->bufev) { tor_assert(conn->read_event == NULL); tor_assert(conn->write_event == NULL); diff --git a/src/or/connection.h b/src/or/connection.h index 36860117d..13a283122 100644 --- a/src/or/connection.h +++ b/src/or/connection.h @@ -12,10 +12,8 @@ #ifndef _TOR_CONNECTION_H #define _TOR_CONNECTION_H -#ifndef USE_BUFFEREVENTS /* XXXX For buf_datalen in inline function */ #include "buffers.h" -#endif const char *conn_type_to_string(int type); const char *conn_state_to_string(int type, int state); @@ -126,5 +124,9 @@ int connection_or_nonopen_was_started_here(or_connection_t *conn); void connection_dump_buffer_mem_stats(int severity); void remove_file_if_very_old(const char *fname, time_t now); +#ifdef USE_BUFFEREVENTS +void connection_configure_bufferevent_callbacks(connection_t *conn); +#endif + #endif diff --git a/src/or/or.h b/src/or/or.h index e0c259b04..e82cd4e1b 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -86,6 +86,7 @@ #ifdef USE_BUFFEREVENTS #include #include +#include #endif #include "crypto.h" -- cgit v1.2.3