aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2003-03-17 02:42:45 +0000
committerNick Mathewson <nickm@torproject.org>2003-03-17 02:42:45 +0000
commit6deed60bb5b5f495b4812f15c0e7a3b21fc440e4 (patch)
treec252e7c1d706abdf838ea32166c384819f6a6b4d
parent9a6b01ba44d1c43865d0c4da08133737dcb761cf (diff)
downloadtor-6deed60bb5b5f495b4812f15c0e7a3b21fc440e4.tar
tor-6deed60bb5b5f495b4812f15c0e7a3b21fc440e4.tar.gz
Add code for end-to-end zlib compression. Still needs flow-control
svn:r187
-rw-r--r--TODO6
-rw-r--r--src/or/buffers.c68
-rw-r--r--src/or/connection.c100
-rw-r--r--src/or/connection_ap.c16
-rw-r--r--src/or/connection_exit.c17
-rw-r--r--src/or/or.h46
6 files changed, 246 insertions, 7 deletions
diff --git a/TODO b/TODO
index 79f1b3be5..5b11ee839 100644
--- a/TODO
+++ b/TODO
@@ -19,8 +19,8 @@ ARMA - arma claims
o Implement topics
- Rotate circuits after N minutes?
- Circuits should expire when circuit->expire triggers
-NICK - Handle half-open connections
-NICK - On the fly compression of each stream
+NICK . Handle half-open connections
+NICK . On the fly compression of each stream
o Clean up the event loop (optimize and sanitize)
- Exit policies
- Path selection algorithms
@@ -88,7 +88,7 @@ SPEC!! - Handle socks commands other than connect, eg, bind?
- Keep track of load over links/nodes, to
know who's hosed
NICK - Daemonize and package
- - Teach it to fork and background
+ o Teach it to fork and background
- Red Hat spec file
- Debian spec file equivalent
diff --git a/src/or/buffers.c b/src/or/buffers.c
index 424e2fe9e..61706fcad 100644
--- a/src/or/buffers.c
+++ b/src/or/buffers.c
@@ -136,6 +136,74 @@ int write_to_buf(char *string, int string_len,
}
+#ifdef USE_ZLIB
+int compress_from_buf(char *string, int string_len,
+ char **buf_in, int *buflen_in, int *buf_datalen_in,
+ z_stream *zstream, int flush) {
+ int err;
+
+ if (!*buf_datalen_in)
+ return 0;
+
+ zstream->next_in = *buf_in;
+ zstream->avail_in = *buf_datalen_in;
+ zstream->next_out = string;
+ zstream->avail_out = string_len;
+
+ err = deflate(zstream, flush);
+
+ switch (err)
+ {
+ case Z_OK:
+ case Z_STREAM_END:
+ memmove(*buf_in, zstream->next_in, zstream->avail_in);
+ *buf_datalen_in = zstream->avail_in;
+ return string_len - zstream->avail_out;
+ case Z_STREAM_ERROR:
+ case Z_BUF_ERROR:
+ log(LOG_ERR, "Error processing compression: %s", zstream->msg);
+ return -1;
+ default:
+ log(LOG_ERR, "Unknown return value from deflate: %d", err);
+ return -1;
+ }
+}
+
+int decompress_buf_to_buf(char **buf_in, int *buflen_in, int *buf_datalen_in,
+ char **buf_out, int *buflen_out, int *buf_datalen_out,
+ z_stream *zstream, int flush)
+{
+ int err;
+
+ zstream->next_in = *buf_in;
+ zstream->avail_in = *buf_datalen_in;
+ zstream->next_out = *buf_out + *buf_datalen_out;
+ zstream->avail_out = *buflen_out - *buf_datalen_out;
+
+ if (!zstream->avail_in && !zstream->avail_out)
+ return 0;
+
+ err = inflate(zstream, flush);
+
+ switch (err)
+ {
+ case Z_OK:
+ case Z_STREAM_END:
+ memmove(*buf_in, zstream->next_in, zstream->avail_in);
+ *buf_datalen_in = zstream->avail_in;
+ *buf_datalen_out = *buflen_out - zstream->avail_out;
+ return 1;
+ case Z_STREAM_ERROR:
+ case Z_BUF_ERROR:
+ log(LOG_ERR, "Error processing compression: %s", zstream->msg);
+ return 1;
+ default:
+ log(LOG_ERR, "Unknown return value from deflate: %d", err);
+ return -1;
+ }
+}
+#endif
+
int fetch_from_buf(char *string, int string_len,
char **buf, int *buflen, int *buf_datalen) {
diff --git a/src/or/connection.c b/src/or/connection.c
index 4108aa2a2..23743cecd 100644
--- a/src/or/connection.c
+++ b/src/or/connection.c
@@ -126,6 +126,29 @@ connection_t *connection_new(int type) {
if(type == CONN_TYPE_OR) {
directory_set_dirty();
}
+#ifdef USE_ZLIB
+ if (type == CONN_TYPE_AP || type == CONN_TYPE_EXIT) {
+ if (buf_new(&conn->z_outbuf, &conn->z_outbuflen, &conn->z_outbuf_datalen) < 0)
+ return NULL;
+ if (! (conn->compression = malloc(sizeof(z_stream))))
+ return NULL;
+ if (! (conn->decompression = malloc(sizeof(z_stream))))
+ return NULL;
+ memset(conn->compression, 0, sizeof(z_stream));
+ memset(conn->decompression, 0, sizeof(z_stream));
+ if (deflateInit(conn->compression, Z_DEFAULT_COMPRESSION) != Z_OK) {
+ log(LOG_ERR, "Error initializing zlib: %s", conn->compression->msg);
+ return NULL;
+ }
+ if (inflateInit(conn->decompression) != Z_OK) {
+ log(LOG_ERR, "Error initializing zlib: %s", conn->decompression->msg);
+ return NULL;
+ }
+ } else {
+ conn->compression = conn->decompression = NULL;
+ }
+ conn->done_sending = conn->done_receiving = 0
+#endif
return conn;
}
@@ -156,6 +179,19 @@ void connection_free(connection_t *conn) {
if(conn->type == CONN_TYPE_OR) {
directory_set_dirty();
}
+#ifdef USE_ZLIB
+ if (conn->compression) {
+ if (inflateEnd(conn->decompression) != Z_OK)
+ log(LOG_ERR,"connection_free(): while closing zlib: %s",
+ conn->decompression->msg);
+ if (deflateEnd(conn->compression) != Z_OK)
+ log(LOG_ERR,"connection_free(): while closing zlib: %s",
+ conn->compression->msg);
+ free(conn->compression);
+ free(conn->decompression);
+ buf_free(conn->z_outbuf);
+ }
+#endif
free(conn);
}
@@ -337,6 +373,49 @@ int connection_fetch_from_buf(char *string, int len, connection_t *conn) {
return fetch_from_buf(string, len, &conn->inbuf, &conn->inbuflen, &conn->inbuf_datalen);
}
+#ifdef USE_ZLIB
+int connection_compress_from_buf(char *string, int len, connection_t *conn,
+ int flush) {
+ return compress_from_buf(string, len,
+ &conn->inbuf, &conn->inbuflen, &conn->inbuf_datalen,
+ conn->compression, flush);
+}
+
+int connection_decompress_to_buf(char *string, int len, connection_t *conn,
+ int flush) {
+ /* This is not sane with respect to flow control; we want to spool out to
+ * z_outbuf, but only decompress and write as needed.
+ */
+ int n;
+ struct timeval now;
+
+ if (write_to_buf(string, len,
+ &conn->z_outbuf, &conn->z_outbuflen, &conn->z_outbuf_datalen) < 0)
+ return -1;
+
+ n = decompress_buf_to_buf(
+ &conn->z_outbuf, &conn->z_outbuflen, &conn->z_outbuf_datalen,
+ &conn->outbuf, &conn->outbuflen, &conn->outbuf_datalen,
+ conn->decompression, flush);
+
+ if (n < 0)
+ return -1;
+
+ if(gettimeofday(&now,NULL) < 0)
+ return -1;
+
+ if(!n)
+ return 0;
+
+ if(conn->marked_for_close)
+ return 0;
+
+ conn->timestamp_lastwritten = now.tv_sec;
+
+ return n;
+}
+#endif
+
int connection_find_on_inbuf(char *string, int len, connection_t *conn) {
return find_on_inbuf(string, len, conn->inbuf, conn->inbuf_datalen);
}
@@ -607,7 +686,7 @@ int connection_process_inbuf(connection_t *conn) {
}
int connection_package_raw_inbuf(connection_t *conn) {
- int amount_to_process;
+ int amount_to_process, len;
cell_t cell;
circuit_t *circ;
@@ -618,13 +697,27 @@ int connection_package_raw_inbuf(connection_t *conn) {
repeat_connection_package_raw_inbuf:
amount_to_process = conn->inbuf_datalen;
-
+
if(!amount_to_process)
return 0;
/* Initialize the cell with 0's */
memset(&cell, 0, sizeof(cell_t));
+#ifdef USE_ZLIB
+ /* This compression logic is not necessarily optimal:
+ * 1) Maybe we should try to read as much as we can onto the inbuf before
+ * compressing.
+ * 2)
+ */
+ len = connection_compress_from_buf(cell.payload + TOPIC_HEADER_SIZE,
+ CELL_PAYLOAD_SIZE - TOPIC_HEADER_SIZE,
+ conn, Z_SYNC_FLUSH);
+ if (len < 0)
+ return -1;
+
+ cell.length = len;
+#else
if(amount_to_process > CELL_PAYLOAD_SIZE - TOPIC_HEADER_SIZE) {
cell.length = CELL_PAYLOAD_SIZE - TOPIC_HEADER_SIZE;
} else {
@@ -633,6 +726,7 @@ repeat_connection_package_raw_inbuf:
if(connection_fetch_from_buf(cell.payload+TOPIC_HEADER_SIZE, cell.length, conn) < 0)
return -1;
+#endif
circ = circuit_get_by_conn(conn);
if(!circ) {
@@ -677,7 +771,7 @@ repeat_connection_package_raw_inbuf:
}
log(LOG_DEBUG,"connection_package_raw_inbuf(): receive_topicwindow at AP is %d",conn->p_receive_topicwindow);
}
- if(amount_to_process > CELL_PAYLOAD_SIZE - TOPIC_HEADER_SIZE) {
+ if (conn->inbuf_datalen) {
log(LOG_DEBUG,"connection_package_raw_inbuf(): recursing.");
goto repeat_connection_package_raw_inbuf;
}
diff --git a/src/or/connection_ap.c b/src/or/connection_ap.c
index 9d28d5a4a..339c14137 100644
--- a/src/or/connection_ap.c
+++ b/src/or/connection_ap.c
@@ -418,11 +418,21 @@ int connection_ap_process_data_cell(cell_t *cell, circuit_t *circ) {
}
log(LOG_DEBUG,"connection_ap_process_data_cell(): willing to receive %d more cells from circ",conn->n_receive_topicwindow);
+#ifdef USE_ZLIB
+ if(connection_decompress_to_buf(cell->payload + TOPIC_HEADER_SIZE,
+ cell->length - TOPIC_HEADER_SIZE,
+ conn, Z_SYNC_FLUSH) < 0) {
+ log(LOG_INFO,"connection_exit_process_data_cell(): write to buf failed. Marking for close.");
+ conn->marked_for_close = 1;
+ return 0;
+ }
+#else
if(connection_write_to_buf(cell->payload + TOPIC_HEADER_SIZE,
cell->length - TOPIC_HEADER_SIZE, conn) < 0) {
conn->marked_for_close = 1;
return 0;
}
+#endif
if(connection_consider_sending_sendme(conn, EDGE_AP) < 0)
conn->marked_for_close = 1;
return 0;
@@ -441,6 +451,12 @@ int connection_ap_process_data_cell(cell_t *cell, circuit_t *circ) {
for(prevconn = circ->p_conn; prevconn->next_topic != conn; prevconn = prevconn->next_topic) ;
prevconn->next_topic = conn->next_topic;
#endif
+#if 0
+ conn->done_sending = 1;
+ shutdown(conn->s, 1); /* XXX check return; refactor NM */
+ if (conn->done_receiving)
+ conn->marked_for_close = 1;
+#endif
conn->marked_for_close = 1;
break;
case TOPIC_COMMAND_CONNECTED:
diff --git a/src/or/connection_exit.c b/src/or/connection_exit.c
index fb0daadc4..857dfe884 100644
--- a/src/or/connection_exit.c
+++ b/src/or/connection_exit.c
@@ -217,12 +217,22 @@ int connection_exit_process_data_cell(cell_t *cell, circuit_t *circ) {
log(LOG_DEBUG,"connection_exit_process_data_cell(): data received while resolving/connecting. Queueing.");
}
log(LOG_DEBUG,"connection_exit_process_data_cell(): put %d bytes on outbuf.",cell->length - TOPIC_HEADER_SIZE);
+#ifdef USE_ZLIB
+ if(connection_decompress_to_buf(cell->payload + TOPIC_HEADER_SIZE,
+ cell->length - TOPIC_HEADER_SIZE,
+ conn, Z_SYNC_FLUSH) < 0) {
+ log(LOG_INFO,"connection_exit_process_data_cell(): write to buf failed. Marking for close.");
+ conn->marked_for_close = 1;
+ return 0;
+ }
+#else
if(connection_write_to_buf(cell->payload + TOPIC_HEADER_SIZE,
cell->length - TOPIC_HEADER_SIZE, conn) < 0) {
log(LOG_INFO,"connection_exit_process_data_cell(): write to buf failed. Marking for close.");
conn->marked_for_close = 1;
return 0;
}
+#endif
if(connection_consider_sending_sendme(conn, EDGE_EXIT) < 0)
conn->marked_for_close = 1;
return 0;
@@ -241,6 +251,13 @@ int connection_exit_process_data_cell(cell_t *cell, circuit_t *circ) {
for(prevconn = circ->n_conn; prevconn->next_topic != conn; prevconn = prevconn->next_topic) ;
prevconn->next_topic = conn->next_topic;
#endif
+#if 0
+ conn->done_sending = 1;
+ shutdown(conn->s, 1); /* XXX check return; refactor NM */
+ if (conn->done_receiving)
+ conn->marked_for_close = 1;
+#endif
+
conn->marked_for_close = 1;
break;
case TOPIC_COMMAND_CONNECTED:
diff --git a/src/or/or.h b/src/or/or.h
index 13a24203a..a35fb26b2 100644
--- a/src/or/or.h
+++ b/src/or/or.h
@@ -36,6 +36,9 @@
#include <errno.h>
#include <assert.h>
#include <time.h>
+#ifdef USE_ZLIB
+#include <zlib.h>
+#endif
#include "../common/crypto.h"
#include "../common/log.h"
@@ -171,6 +174,7 @@
#define CONFIG_TYPE_INT 2
#define CONFIG_TYPE_LONG 3
#define CONFIG_TYPE_DOUBLE 4
+#define CONFIG_TYPE_BOOL 5
#define CONFIG_LINE_MAXLEN 1024
@@ -254,24 +258,39 @@ struct connection_t {
uint16_t port;
/* used by exit and ap: */
-
uint16_t topic_id;
struct connection_t *next_topic;
int n_receive_topicwindow;
int p_receive_topicwindow;
+ int done_sending;
+ int done_receiving;
+#ifdef USE_ZLIB
+ char *z_outbuf;
+ int z_outbuflen;
+ int z_outbuf_datalen;
+
+ z_stream *compression;
+ z_stream *decompression;
+#endif
+/* Used by ap: */
char socks_version;
char read_username;
+/* Used by exit and ap: */
char *dest_addr;
uint16_t dest_port; /* host order */
+/* Used by ap: */
char dest_tmp[512];
int dest_tmplen;
+/* Used by everyone */
char *address; /* strdup into this, because free_connection frees it */
+/* Used for cell connections */
crypto_pk_env_t *pkey; /* public RSA key for the other side */
+/* Used while negotiating OR/OR connections */
char nonce[8];
};
@@ -383,6 +402,7 @@ typedef struct {
char *RouterFile;
char *PrivateKeyFile;
double CoinWeight;
+ int Daemon;
int ORPort;
int OPPort;
int APPort;
@@ -421,12 +441,29 @@ int write_to_buf(char *string, int string_len,
* return total number of bytes on the buf
*/
+
int fetch_from_buf(char *string, int string_len,
char **buf, int *buflen, int *buf_datalen);
/* if there is string_len bytes in buf, write them onto string,
* then memmove buf back (that is, remove them from buf)
*/
+#ifdef USE_ZLIB
+int compress_from_buf(char *string, int string_len,
+ char **buf_in, int *buflen_in, int *buf_datalen_in,
+ z_stream *zstream, int flush);
+ /* read and compress as many characters as possible from buf, writing up to
+ * string_len of them onto string, then memmove buf back. Return number of
+ * characters written.
+ */
+
+int decompress_buf_to_buf(char **buf_in, int *buflen_in, int *buf_datalen_in,
+ char **buf_out, int *buflen_out, int *buf_datalen_out,
+ z_stream *zstream, int flush);
+ /* XXX document this NM
+ */
+#endif
+
int find_on_inbuf(char *string, int string_len,
char *buf, int buf_datalen);
/* find first instance of needle 'string' on haystack 'buf'. return how
@@ -529,6 +566,13 @@ int connection_read_to_buf(connection_t *conn);
int connection_fetch_from_buf(char *string, int len, connection_t *conn);
+#ifdef USE_ZLIB
+int connection_compress_from_buf(char *string, int len, connection_t *conn,
+ int flush);
+int connection_decompress_to_buf(char *string, int len, connection_t *conn,
+ int flush);
+#endif
+
int connection_outbuf_too_full(connection_t *conn);
int connection_find_on_inbuf(char *string, int len, connection_t *conn);
int connection_wants_to_flush(connection_t *conn);