[tor-commits] [tor/master] Refactor the streaming compression code.
nickm at torproject.org
nickm at torproject.org
Tue Apr 25 12:18:24 UTC 2017
commit 3c4459bcbf1d3a9da4a8b3a8bfed10d2e045af74
Author: Alexander Færøy <ahf at torproject.org>
Date: Mon Apr 17 14:57:37 2017 +0200
Refactor the streaming compression code.
This patch refactors our streaming compression code to allow us to
extend it with non-zlib/non-gzip based compression schemas.
See https://bugs.torproject.org/21663
---
src/common/torgzip.c | 62 ++++++++++++++++++++++++-------------------------
src/common/torgzip.h | 30 +++++++++++++-----------
src/or/buffers.c | 20 +++++++++-------
src/or/buffers.h | 2 +-
src/or/circuitlist.c | 4 ++--
src/or/connection.c | 2 +-
src/or/directory.c | 14 +++++------
src/or/dirserv.c | 2 +-
src/or/or.h | 4 ++--
src/test/test_buffers.c | 12 +++++-----
src/test/test_util.c | 28 +++++++++++-----------
11 files changed, 93 insertions(+), 87 deletions(-)
diff --git a/src/common/torgzip.c b/src/common/torgzip.c
index cbfcabd..af4bbc5 100644
--- a/src/common/torgzip.c
+++ b/src/common/torgzip.c
@@ -399,9 +399,9 @@ detect_compression_method(const char *in, size_t in_len)
}
}
-/** Internal state for an incremental zlib compression/decompression. The
- * body of this struct is not exposed. */
-struct tor_zlib_state_t {
+/** Internal state for an incremental compression/decompression. The body of
+ * this struct is not exposed. */
+struct tor_compress_state_t {
struct z_stream_s stream; /**< The zlib stream */
int compress; /**< True if we are compressing; false if we are inflating */
@@ -414,14 +414,13 @@ struct tor_zlib_state_t {
size_t allocation;
};
-/** Construct and return a tor_zlib_state_t object using <b>method</b>. If
- * <b>compress</b>, it's for compression; otherwise it's for
- * decompression. */
-tor_zlib_state_t *
-tor_zlib_new(int compress_, compress_method_t method,
- compression_level_t compression_level)
+/** Construct and return a tor_compress_state_t object using <b>method</b>. If
+ * <b>compress</b>, it's for compression; otherwise it's for decompression. */
+tor_compress_state_t *
+tor_compress_new(int compress_, compress_method_t method,
+ compression_level_t compression_level)
{
- tor_zlib_state_t *out;
+ tor_compress_state_t *out;
int bits, memlevel;
if (! compress_) {
@@ -430,7 +429,7 @@ tor_zlib_new(int compress_, compress_method_t method,
compression_level = HIGH_COMPRESSION;
}
- out = tor_malloc_zero(sizeof(tor_zlib_state_t));
+ out = tor_malloc_zero(sizeof(tor_compress_state_t));
out->stream.zalloc = Z_NULL;
out->stream.zfree = Z_NULL;
out->stream.opaque = NULL;
@@ -462,16 +461,17 @@ tor_zlib_new(int compress_, compress_method_t method,
* to *<b>out</b>, adjusting the values as we go. If <b>finish</b> is true,
* we've reached the end of the input.
*
- * Return TOR_ZLIB_DONE if we've finished the entire compression/decompression.
- * Return TOR_ZLIB_OK if we're processed everything from the input.
- * Return TOR_ZLIB_BUF_FULL if we're out of space on <b>out</b>.
- * Return TOR_ZLIB_ERR if the stream is corrupt.
+ * Return TOR_COMPRESS_DONE if we've finished the entire
+ * compression/decompression.
+ * Return TOR_COMPRESS_OK if we're processed everything from the input.
+ * Return TOR_COMPRESS_BUFFER_FULL if we're out of space on <b>out</b>.
+ * Return TOR_COMPRESS_ERROR if the stream is corrupt.
*/
-tor_zlib_output_t
-tor_zlib_process(tor_zlib_state_t *state,
- char **out, size_t *out_len,
- const char **in, size_t *in_len,
- int finish)
+tor_compress_output_t
+tor_compress_process(tor_compress_state_t *state,
+ char **out, size_t *out_len,
+ const char **in, size_t *in_len,
+ int finish)
{
int err;
tor_assert(*in_len <= UINT_MAX);
@@ -498,31 +498,31 @@ tor_zlib_process(tor_zlib_state_t *state,
if (! state->compress &&
is_compression_bomb(state->input_so_far, state->output_so_far)) {
log_warn(LD_DIR, "Possible zlib bomb; abandoning stream.");
- return TOR_ZLIB_ERR;
+ return TOR_COMPRESS_ERROR;
}
switch (err)
{
case Z_STREAM_END:
- return TOR_ZLIB_DONE;
+ return TOR_COMPRESS_DONE;
case Z_BUF_ERROR:
if (state->stream.avail_in == 0 && !finish)
- return TOR_ZLIB_OK;
- return TOR_ZLIB_BUF_FULL;
+ return TOR_COMPRESS_OK;
+ return TOR_COMPRESS_BUFFER_FULL;
case Z_OK:
if (state->stream.avail_out == 0 || finish)
- return TOR_ZLIB_BUF_FULL;
- return TOR_ZLIB_OK;
+ return TOR_COMPRESS_BUFFER_FULL;
+ return TOR_COMPRESS_OK;
default:
log_warn(LD_GENERAL, "Gzip returned an error: %s",
state->stream.msg ? state->stream.msg : "<no message>");
- return TOR_ZLIB_ERR;
+ return TOR_COMPRESS_ERROR;
}
}
/** Deallocate <b>state</b>. */
void
-tor_zlib_free(tor_zlib_state_t *state)
+tor_compress_free(tor_compress_state_t *state)
{
if (!state)
return;
@@ -553,7 +553,7 @@ tor_zlib_state_size_precalc(int inflate_, int windowbits, int memlevel)
that is, 32K for windowBits=15 (default value) plus a few kilobytes
for small objects."
*/
- return sizeof(tor_zlib_state_t) + sizeof(struct z_stream_s) +
+ return sizeof(tor_compress_state_t) + sizeof(struct z_stream_s) +
(1 << 15) + A_FEW_KILOBYTES;
} else {
/* Also from zconf.h:
@@ -562,7 +562,7 @@ tor_zlib_state_size_precalc(int inflate_, int windowbits, int memlevel)
(1 << (windowBits+2)) + (1 << (memLevel+9))
... plus a few kilobytes for small objects."
*/
- return sizeof(tor_zlib_state_t) + sizeof(struct z_stream_s) +
+ return sizeof(tor_compress_state_t) + sizeof(struct z_stream_s) +
(1 << (windowbits + 2)) + (1 << (memlevel + 9)) + A_FEW_KILOBYTES;
}
#undef A_FEW_KILOBYTES
@@ -570,7 +570,7 @@ tor_zlib_state_size_precalc(int inflate_, int windowbits, int memlevel)
/** Return the approximate number of bytes allocated for <b>state</b>. */
size_t
-tor_zlib_state_size(const tor_zlib_state_t *state)
+tor_compress_state_size(const tor_compress_state_t *state)
{
return state->allocation;
}
diff --git a/src/common/torgzip.h b/src/common/torgzip.h
index ac9763e..2b0504e 100644
--- a/src/common/torgzip.h
+++ b/src/common/torgzip.h
@@ -46,23 +46,27 @@ tor_zlib_get_header_version_str(void);
compress_method_t detect_compression_method(const char *in, size_t in_len);
-/** Return values from tor_zlib_process; see that function's documentation for
- * details. */
+/** Return values from tor_compress_process; see that function's documentation
+ * for details. */
typedef enum {
- TOR_ZLIB_OK, TOR_ZLIB_DONE, TOR_ZLIB_BUF_FULL, TOR_ZLIB_ERR
-} tor_zlib_output_t;
+ TOR_COMPRESS_OK,
+ TOR_COMPRESS_DONE,
+ TOR_COMPRESS_BUFFER_FULL,
+ TOR_COMPRESS_ERROR
+} tor_compress_output_t;
/** Internal state for an incremental zlib compression/decompression. */
-typedef struct tor_zlib_state_t tor_zlib_state_t;
-tor_zlib_state_t *tor_zlib_new(int compress, compress_method_t method,
- compression_level_t level);
+typedef struct tor_compress_state_t tor_compress_state_t;
+tor_compress_state_t *tor_compress_new(int compress,
+ compress_method_t method,
+ compression_level_t level);
-tor_zlib_output_t tor_zlib_process(tor_zlib_state_t *state,
- char **out, size_t *out_len,
- const char **in, size_t *in_len,
- int finish);
-void tor_zlib_free(tor_zlib_state_t *state);
+tor_compress_output_t tor_compress_process(tor_compress_state_t *state,
+ char **out, size_t *out_len,
+ const char **in, size_t *in_len,
+ int finish);
+void tor_compress_free(tor_compress_state_t *state);
-size_t tor_zlib_state_size(const tor_zlib_state_t *state);
+size_t tor_compress_state_size(const tor_compress_state_t *state);
size_t tor_zlib_get_total_allocation(void);
#endif
diff --git a/src/or/buffers.c b/src/or/buffers.c
index e559f80..3490ce4 100644
--- a/src/or/buffers.c
+++ b/src/or/buffers.c
@@ -2088,11 +2088,11 @@ fetch_from_buf_line(buf_t *buf, char *data_out, size_t *data_len)
}
/** Compress on uncompress the <b>data_len</b> bytes in <b>data</b> using the
- * zlib state <b>state</b>, appending the result to <b>buf</b>. If
+ * compression state <b>state</b>, appending the result to <b>buf</b>. If
* <b>done</b> is true, flush the data in the state and finish the
* compression/uncompression. Return -1 on failure, 0 on success. */
int
-write_to_buf_zlib(buf_t *buf, tor_zlib_state_t *state,
+write_to_buf_zlib(buf_t *buf, tor_compress_state_t *state,
const char *data, size_t data_len,
int done)
{
@@ -2108,20 +2108,22 @@ write_to_buf_zlib(buf_t *buf, tor_zlib_state_t *state,
}
next = CHUNK_WRITE_PTR(buf->tail);
avail = old_avail = CHUNK_REMAINING_CAPACITY(buf->tail);
- switch (tor_zlib_process(state, &next, &avail, &data, &data_len, done)) {
- case TOR_ZLIB_DONE:
+ switch (tor_compress_process(state, &next, &avail,
+ &data, &data_len, done)) {
+ case TOR_COMPRESS_DONE:
over = 1;
break;
- case TOR_ZLIB_ERR:
+ case TOR_COMPRESS_ERROR:
return -1;
- case TOR_ZLIB_OK:
+ case TOR_COMPRESS_OK:
if (data_len == 0)
over = 1;
break;
- case TOR_ZLIB_BUF_FULL:
+ case TOR_COMPRESS_BUFFER_FULL:
if (avail) {
- /* Zlib says we need more room (ZLIB_BUF_FULL). Start a new chunk
- * automatically, whether were going to or not. */
+ /* The compression module says we need more room
+ * (TOR_COMPRESS_BUFFER_FULL). Start a new chunk automatically,
+ * whether were going to or not. */
need_new_chunk = 1;
}
break;
diff --git a/src/or/buffers.h b/src/or/buffers.h
index c6a5ffa..49a2ae4 100644
--- a/src/or/buffers.h
+++ b/src/or/buffers.h
@@ -36,7 +36,7 @@ int flush_buf(tor_socket_t s, buf_t *buf, size_t sz, size_t *buf_flushlen);
int flush_buf_tls(tor_tls_t *tls, buf_t *buf, size_t sz, size_t *buf_flushlen);
int write_to_buf(const char *string, size_t string_len, buf_t *buf);
-int write_to_buf_zlib(buf_t *buf, tor_zlib_state_t *state,
+int write_to_buf_zlib(buf_t *buf, tor_compress_state_t *state,
const char *data, size_t data_len, int done);
int move_buf_to_buf(buf_t *buf_out, buf_t *buf_in, size_t *buf_flushlen);
int fetch_from_buf(char *string, size_t string_len, buf_t *buf);
diff --git a/src/or/circuitlist.c b/src/or/circuitlist.c
index 80bb7f6..4f11586 100644
--- a/src/or/circuitlist.c
+++ b/src/or/circuitlist.c
@@ -1991,8 +1991,8 @@ single_conn_free_bytes(connection_t *conn)
if (conn->type == CONN_TYPE_DIR) {
dir_connection_t *dir_conn = TO_DIR_CONN(conn);
if (dir_conn->zlib_state) {
- result += tor_zlib_state_size(dir_conn->zlib_state);
- tor_zlib_free(dir_conn->zlib_state);
+ result += tor_compress_state_size(dir_conn->zlib_state);
+ tor_compress_free(dir_conn->zlib_state);
dir_conn->zlib_state = NULL;
}
}
diff --git a/src/or/connection.c b/src/or/connection.c
index 09e316d..ad97c3b 100644
--- a/src/or/connection.c
+++ b/src/or/connection.c
@@ -628,7 +628,7 @@ connection_free_(connection_t *conn)
dir_connection_t *dir_conn = TO_DIR_CONN(conn);
tor_free(dir_conn->requested_resource);
- tor_zlib_free(dir_conn->zlib_state);
+ tor_compress_free(dir_conn->zlib_state);
if (dir_conn->spool) {
SMARTLIST_FOREACH(dir_conn->spool, spooled_resource_t *, spooled,
spooled_resource_free(spooled));
diff --git a/src/or/directory.c b/src/or/directory.c
index e7a71dd..5ae61e4 100644
--- a/src/or/directory.c
+++ b/src/or/directory.c
@@ -3178,7 +3178,7 @@ handle_get_current_consensus(dir_connection_t *conn,
write_http_response_header(conn, -1, compressed,
smartlist_len(conn->spool) == 1 ? lifetime : 0);
if (! compressed)
- conn->zlib_state = tor_zlib_new(0, ZLIB_METHOD, HIGH_COMPRESSION);
+ conn->zlib_state = tor_compress_new(0, ZLIB_METHOD, HIGH_COMPRESSION);
/* Prime the connection with some data. */
const int initial_flush_result = connection_dirserv_flushed_some(conn);
@@ -3276,8 +3276,8 @@ handle_get_status_vote(dir_connection_t *conn, const get_handler_args_t *args)
if (smartlist_len(items)) {
if (compressed) {
- conn->zlib_state = tor_zlib_new(1, ZLIB_METHOD,
- choose_compression_level(estimated_len));
+ conn->zlib_state = tor_compress_new(1, ZLIB_METHOD,
+ choose_compression_level(estimated_len));
SMARTLIST_FOREACH(items, const char *, c,
connection_write_to_buf_zlib(c, strlen(c), conn, 0));
connection_write_to_buf_zlib("", 0, conn, 1);
@@ -3335,7 +3335,7 @@ handle_get_microdesc(dir_connection_t *conn, const get_handler_args_t *args)
write_http_response_header(conn, -1, compressed, MICRODESC_CACHE_LIFETIME);
if (compressed)
- conn->zlib_state = tor_zlib_new(1, ZLIB_METHOD,
+ conn->zlib_state = tor_compress_new(1, ZLIB_METHOD,
choose_compression_level(size_guess));
const int initial_flush_result = connection_dirserv_flushed_some(conn);
@@ -3428,7 +3428,7 @@ handle_get_descriptor(dir_connection_t *conn, const get_handler_args_t *args)
}
write_http_response_header(conn, -1, compressed, cache_lifetime);
if (compressed)
- conn->zlib_state = tor_zlib_new(1, ZLIB_METHOD,
+ conn->zlib_state = tor_compress_new(1, ZLIB_METHOD,
choose_compression_level(size_guess));
clear_spool = 0;
/* Prime the connection with some data. */
@@ -3519,8 +3519,8 @@ handle_get_keys(dir_connection_t *conn, const get_handler_args_t *args)
write_http_response_header(conn, compressed?-1:len, compressed, 60*60);
if (compressed) {
- conn->zlib_state = tor_zlib_new(1, ZLIB_METHOD,
- choose_compression_level(len));
+ conn->zlib_state = tor_compress_new(1, ZLIB_METHOD,
+ choose_compression_level(len));
SMARTLIST_FOREACH(certs, authority_cert_t *, c,
connection_write_to_buf_zlib(c->cache_info.signed_descriptor_body,
c->cache_info.signed_descriptor_len,
diff --git a/src/or/dirserv.c b/src/or/dirserv.c
index 87afd69..186478c 100644
--- a/src/or/dirserv.c
+++ b/src/or/dirserv.c
@@ -3792,7 +3792,7 @@ connection_dirserv_flushed_some(dir_connection_t *conn)
/* Flush the zlib state: there could be more bytes pending in there, and
* we don't want to omit bytes. */
connection_write_to_buf_zlib("", 0, conn, 1);
- tor_zlib_free(conn->zlib_state);
+ tor_compress_free(conn->zlib_state);
conn->zlib_state = NULL;
}
return 0;
diff --git a/src/or/or.h b/src/or/or.h
index a7b3a66..6aec588 100644
--- a/src/or/or.h
+++ b/src/or/or.h
@@ -1773,8 +1773,8 @@ typedef struct dir_connection_t {
/** List of spooled_resource_t for objects that we're spooling. We use
* it from back to front. */
smartlist_t *spool;
- /** The zlib object doing on-the-fly compression for spooled data. */
- tor_zlib_state_t *zlib_state;
+ /** The compression object doing on-the-fly compression for spooled data. */
+ tor_compress_state_t *zlib_state;
/** What rendezvous service are we querying for? */
rend_data_t *rend_data;
diff --git a/src/test/test_buffers.c b/src/test/test_buffers.c
index f0edd42..a6f0309 100644
--- a/src/test/test_buffers.c
+++ b/src/test/test_buffers.c
@@ -584,12 +584,12 @@ test_buffers_zlib_impl(int finalize_with_nil)
char *contents = NULL;
char *expanded = NULL;
buf_t *buf = NULL;
- tor_zlib_state_t *zlib_state = NULL;
+ tor_compress_state_t *zlib_state = NULL;
size_t out_len, in_len;
int done;
buf = buf_new_with_capacity(128); /* will round up */
- zlib_state = tor_zlib_new(1, ZLIB_METHOD, HIGH_COMPRESSION);
+ zlib_state = tor_compress_new(1, ZLIB_METHOD, HIGH_COMPRESSION);
msg = tor_malloc(512);
crypto_rand(msg, 512);
@@ -621,7 +621,7 @@ test_buffers_zlib_impl(int finalize_with_nil)
done:
buf_free(buf);
- tor_zlib_free(zlib_state);
+ tor_compress_free(zlib_state);
tor_free(contents);
tor_free(expanded);
tor_free(msg);
@@ -647,7 +647,7 @@ test_buffers_zlib_fin_at_chunk_end(void *arg)
char *contents = NULL;
char *expanded = NULL;
buf_t *buf = NULL;
- tor_zlib_state_t *zlib_state = NULL;
+ tor_compress_state_t *zlib_state = NULL;
size_t out_len, in_len;
size_t sz, headerjunk;
(void) arg;
@@ -666,7 +666,7 @@ test_buffers_zlib_fin_at_chunk_end(void *arg)
tt_uint_op(buf->head->datalen, OP_EQ, headerjunk);
tt_uint_op(buf_datalen(buf), OP_EQ, headerjunk);
/* Write an empty string, with finalization on. */
- zlib_state = tor_zlib_new(1, ZLIB_METHOD, HIGH_COMPRESSION);
+ zlib_state = tor_compress_new(1, ZLIB_METHOD, HIGH_COMPRESSION);
tt_int_op(write_to_buf_zlib(buf, zlib_state, "", 0, 1), OP_EQ, 0);
in_len = buf_datalen(buf);
@@ -687,7 +687,7 @@ test_buffers_zlib_fin_at_chunk_end(void *arg)
done:
buf_free(buf);
- tor_zlib_free(zlib_state);
+ tor_compress_free(zlib_state);
tor_free(contents);
tor_free(expanded);
tor_free(msg);
diff --git a/src/test/test_util.c b/src/test/test_util.c
index 7e24279..dacf56f 100644
--- a/src/test/test_util.c
+++ b/src/test/test_util.c
@@ -2249,7 +2249,7 @@ test_util_gzip(void *arg)
char *buf1=NULL, *buf2=NULL, *buf3=NULL, *cp1, *cp2;
const char *ccp2;
size_t len1, len2;
- tor_zlib_state_t *state = NULL;
+ tor_compress_state_t *state = NULL;
(void)arg;
buf1 = tor_strdup("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAZAAAAAAAAAAAAAAAAAAAZ");
@@ -2320,21 +2320,21 @@ test_util_gzip(void *arg)
tor_free(buf1);
tor_free(buf2);
tor_free(buf3);
- state = tor_zlib_new(1, ZLIB_METHOD, HIGH_COMPRESSION);
+ state = tor_compress_new(1, ZLIB_METHOD, HIGH_COMPRESSION);
tt_assert(state);
cp1 = buf1 = tor_malloc(1024);
len1 = 1024;
ccp2 = "ABCDEFGHIJABCDEFGHIJ";
len2 = 21;
- tt_assert(tor_zlib_process(state, &cp1, &len1, &ccp2, &len2, 0)
- == TOR_ZLIB_OK);
+ tt_int_op(tor_compress_process(state, &cp1, &len1, &ccp2, &len2, 0),
+ OP_EQ, TOR_COMPRESS_OK);
tt_int_op(0, OP_EQ, len2); /* Make sure we compressed it all. */
tt_assert(cp1 > buf1);
len2 = 0;
cp2 = cp1;
- tt_assert(tor_zlib_process(state, &cp1, &len1, &ccp2, &len2, 1)
- == TOR_ZLIB_DONE);
+ tt_int_op(tor_compress_process(state, &cp1, &len1, &ccp2, &len2, 1),
+ OP_EQ, TOR_COMPRESS_DONE);
tt_int_op(0, OP_EQ, len2);
tt_assert(cp1 > cp2); /* Make sure we really added something. */
@@ -2346,7 +2346,7 @@ test_util_gzip(void *arg)
done:
if (state)
- tor_zlib_free(state);
+ tor_compress_free(state);
tor_free(buf2);
tor_free(buf3);
tor_free(buf1);
@@ -2364,7 +2364,7 @@ test_util_gzip_compression_bomb(void *arg)
char *one_mb = tor_malloc_zero(one_million);
char *result = NULL;
size_t result_len = 0;
- tor_zlib_state_t *state = NULL;
+ tor_compress_state_t *state = NULL;
/* Make sure we can't produce a compression bomb */
setup_full_capture_of_logs(LOG_WARN);
@@ -2386,22 +2386,22 @@ test_util_gzip_compression_bomb(void *arg)
ZLIB_METHOD, 0, LOG_WARN));
/* Now try streaming that. */
- state = tor_zlib_new(0, ZLIB_METHOD, HIGH_COMPRESSION);
- tor_zlib_output_t r;
+ state = tor_compress_new(0, ZLIB_METHOD, HIGH_COMPRESSION);
+ tor_compress_output_t r;
const char *inp = compression_bomb;
size_t inlen = 1039;
do {
char *outp = one_mb;
size_t outleft = 4096; /* small on purpose */
- r = tor_zlib_process(state, &outp, &outleft, &inp, &inlen, 0);
+ r = tor_compress_process(state, &outp, &outleft, &inp, &inlen, 0);
tt_int_op(inlen, OP_NE, 0);
- } while (r == TOR_ZLIB_BUF_FULL);
+ } while (r == TOR_COMPRESS_BUFFER_FULL);
- tt_int_op(r, OP_EQ, TOR_ZLIB_ERR);
+ tt_int_op(r, OP_EQ, TOR_COMPRESS_ERROR);
done:
tor_free(one_mb);
- tor_zlib_free(state);
+ tor_compress_free(state);
}
/** Run unit tests for mmap() wrapper functionality. */
More information about the tor-commits
mailing list