[tor-commits] [obfsproxy/master] A conn_t now wraps only one socket. A new struct, circuit_t, associates the two sides of a proxy channel.
nickm at torproject.org
nickm at torproject.org
Fri Sep 9 17:08:58 UTC 2011
commit 99dee61ff3aa70aab43b9543a8fd0c58858f0dcd
Author: Zack Weinberg <zackw at panix.com>
Date: Wed Aug 3 11:20:17 2011 -0700
A conn_t now wraps only one socket. A new struct, circuit_t, associates the two sides of a proxy channel.
---
src/network.c | 538 ++++++++++++++++++++++++++++++++------------------------
src/network.h | 32 +++-
src/util.h | 1 +
3 files changed, 331 insertions(+), 240 deletions(-)
diff --git a/src/network.c b/src/network.c
index 4ae2c15..08ce696 100644
--- a/src/network.c
+++ b/src/network.c
@@ -20,25 +20,27 @@
/* Terminology used in this file:
- A "side" is a bidirectional communications channel, usually backed
- by a network socket and represented at this layer by a
- 'struct bufferevent'.
-
- A "connection" is a _pair_ of sides, referred to as the "upstream"
- side and the "downstream" side. A connection is represented by a
- 'conn_t'. The upstream side of a connection communicates in
- cleartext with the higher-level program that wishes to make use of
- our obfuscation service. The downstream side commmunicates in an
- obfuscated fashion with the remote peer that the higher-level
- client wishes to contact.
+ A "connection" is a bidirectional communications channel, usually
+ backed by a network socket, and represented in this layer by a
+ 'conn_t', wrapping a 'struct bufferevent'.
+
+ A "circuit" is a _pair_ of connections, referred to as the
+ "upstream" and "downstream" connections. A circuit is represented
+ by a 'circuit_t'. The upstream connection of a circuit
+ communicates in cleartext with the higher-level program that wishes
+ to make use of our obfuscation service. The downstream connection
+ commmunicates in an obfuscated fashion with the remote peer that
+ the higher-level client wishes to contact.
A "listener" is a listening socket bound to a particular
- obfuscation protocol, represented in this layer by a 'listener_t'.
- Connecting to a listener creates one side of a connection, and
- causes this program to initiate the other side of the connection.
- A listener is said to be a "client" listener if connecting to it
- creates the _upstream_ side of a connection, and a "server"
- listener if connecting to it creates the _downstream_ side.
+ obfuscation protocol, represented in this layer by a 'listener_t'
+ and its 'config_t'. Connecting to a listener creates one
+ connection of a circuit, and causes this program to initiate the
+ other connection (possibly after receiving in-band instructions
+ about where to connect to). A listener is said to be a "client"
+ listener if connecting to it creates the _upstream_ connection, and
+ a "server" listener if connecting to it creates the _downstream_
+ connection.
There are two kinds of client listeners: a "simple" client listener
always connects to the same remote peer every time it needs to
@@ -72,19 +74,17 @@ static void listener_cb(struct evconnlistener *evcl, evutil_socket_t fd,
struct sockaddr *sourceaddr, int socklen,
void *closure);
-static void simple_client_listener_cb(conn_t *conn, struct bufferevent *buf);
-static void socks_client_listener_cb(conn_t *conn, struct bufferevent *buf);
-static void simple_server_listener_cb(conn_t *conn, struct bufferevent *buf);
+static void simple_client_listener_cb(conn_t *conn);
+static void socks_client_listener_cb(conn_t *conn);
+static void simple_server_listener_cb(conn_t *conn);
static void conn_free(conn_t *conn);
-static void close_conn(conn_t *conn);
-static void close_all_connections(void);
+static void conn_free_all(void);
+static void conn_free_on_flush(struct bufferevent *bev, void *arg);
-static void close_conn_on_flush(struct bufferevent *bev, void *arg);
-
-static struct bufferevent *open_outbound_socket(conn_t *conn,
- struct event_base *base,
- bufferevent_data_cb readcb);
+static int circuit_create(conn_t *upstream, conn_t *downstream);
+static void circuit_free(circuit_t *circuit);
+static conn_t *open_outbound(conn_t *conn, bufferevent_data_cb readcb);
static void upstream_read_cb(struct bufferevent *bev, void *arg);
static void downstream_read_cb(struct bufferevent *bev, void *arg);
@@ -114,7 +114,7 @@ start_shutdown(int barbaric)
shutting_down=1;
if (barbaric)
- close_all_connections();
+ conn_free_all();
if (connections && smartlist_len(connections) == 0) {
smartlist_free(connections);
@@ -129,7 +129,7 @@ start_shutdown(int barbaric)
Closes all open connections.
*/
static void
-close_all_connections(void)
+conn_free_all(void)
{
if (!connections)
return;
@@ -251,10 +251,11 @@ listener_cb(struct evconnlistener *evcl, evutil_socket_t fd,
smartlist_len(connections));
conn->peername = peername;
+ conn->buffer = buf;
switch (conn->mode) {
- case LSN_SIMPLE_CLIENT: simple_client_listener_cb(conn, buf); break;
- case LSN_SOCKS_CLIENT: socks_client_listener_cb(conn, buf); break;
- case LSN_SIMPLE_SERVER: simple_server_listener_cb(conn, buf); break;
+ case LSN_SIMPLE_CLIENT: simple_client_listener_cb(conn); break;
+ case LSN_SOCKS_CLIENT: socks_client_listener_cb(conn); break;
+ case LSN_SIMPLE_SERVER: simple_server_listener_cb(conn); break;
default:
obfs_abort();
}
@@ -265,23 +266,16 @@ listener_cb(struct evconnlistener *evcl, evutil_socket_t fd,
simple client mode.
*/
static void
-simple_client_listener_cb(conn_t *conn, struct bufferevent *buf)
+simple_client_listener_cb(conn_t *conn)
{
- struct event_base *base = bufferevent_get_base(buf);
- obfs_assert(buf);
obfs_assert(conn);
obfs_assert(conn->mode == LSN_SIMPLE_CLIENT);
log_debug("%s: simple client connection", conn->peername);
- conn->upstream = buf;
- bufferevent_setcb(conn->upstream, upstream_read_cb, NULL, error_cb, conn);
-
- /* Don't enable the upstream side for reading at this point; wait
- till the downstream side is established. */
+ bufferevent_setcb(conn->buffer, upstream_read_cb, NULL, error_cb, conn);
- conn->downstream = open_outbound_socket(conn, base, downstream_read_cb);
- if (!conn->downstream) {
- close_conn(conn);
+ if (circuit_create(conn, open_outbound(conn, downstream_read_cb))) {
+ conn_free(conn);
return;
}
@@ -293,22 +287,19 @@ simple_client_listener_cb(conn_t *conn, struct bufferevent *buf)
socks mode.
*/
static void
-socks_client_listener_cb(conn_t *conn, struct bufferevent *buf)
+socks_client_listener_cb(conn_t *conn)
{
- obfs_assert(buf);
obfs_assert(conn);
obfs_assert(conn->mode == LSN_SOCKS_CLIENT);
log_debug("%s: socks client connection", conn->peername);
- conn->upstream = buf;
- bufferevent_setcb(conn->upstream, socks_read_cb, NULL, error_cb, conn);
- bufferevent_enable(conn->upstream, EV_READ|EV_WRITE);
-
- /* Construct SOCKS state. */
conn->socks_state = socks_state_new();
- /* Do not create a downstream bufferevent at this time; the socks
- handler will do it after it learns the downstream peer address. */
+ bufferevent_setcb(conn->buffer, socks_read_cb, NULL, error_cb, conn);
+ bufferevent_enable(conn->buffer, EV_READ|EV_WRITE);
+
+ /* Do not create a circuit at this time; the socks handler will do
+ it after it learns the remote peer address. */
log_debug("%s: setup complete", conn->peername);
}
@@ -318,25 +309,16 @@ socks_client_listener_cb(conn_t *conn, struct bufferevent *buf)
server mode.
*/
static void
-simple_server_listener_cb(conn_t *conn, struct bufferevent *buf)
+simple_server_listener_cb(conn_t *conn)
{
- struct event_base *base = bufferevent_get_base(buf);
- obfs_assert(buf);
obfs_assert(conn);
obfs_assert(conn->mode == LSN_SIMPLE_SERVER);
log_debug("%s: server connection", conn->peername);
- conn->downstream = buf;
- bufferevent_setcb(conn->downstream,
- downstream_read_cb, NULL, error_cb, conn);
-
- /* Don't enable the downstream side for reading at this point; wait
- till the upstream side is established. */
+ bufferevent_setcb(conn->buffer, downstream_read_cb, NULL, error_cb, conn);
- /* New bufferevent to connect to the target address. */
- conn->upstream = open_outbound_socket(conn, base, upstream_read_cb);
- if (!conn->upstream) {
- close_conn(conn);
+ if (circuit_create(open_outbound(conn, upstream_read_cb), conn)) {
+ conn_free(conn);
return;
}
@@ -349,36 +331,29 @@ simple_server_listener_cb(conn_t *conn, struct bufferevent *buf)
static void
conn_free(conn_t *conn)
{
- if (conn->peername)
- free(conn->peername);
- if (conn->socks_state)
- socks_state_free(conn->socks_state);
- if (conn->upstream)
- bufferevent_free(conn->upstream);
- if (conn->downstream)
- bufferevent_free(conn->downstream);
-
- proto_conn_free(conn);
-}
-
-/**
- Closes a fully open connection.
-*/
-static void
-close_conn(conn_t *conn)
-{
- obfs_assert(connections);
- log_debug("Closing connection from %s; %d remaining",
- conn->peername, smartlist_len(connections) - 1);
-
- smartlist_remove(connections, conn);
- conn_free(conn);
-
- /* If this was the last connection AND we are shutting down,
- finish shutdown. */
- if (smartlist_len(connections) == 0 && shutting_down) {
- smartlist_free(connections);
- finish_shutdown();
+ if (conn->circuit)
+ circuit_free(conn->circuit); /* will recurse and take care of us */
+ else {
+ if (connections) {
+ smartlist_remove(connections, conn);
+ log_debug("Closing connection with %s; %d remaining",
+ conn->peername, smartlist_len(connections));
+ }
+ if (conn->peername)
+ free(conn->peername);
+ if (conn->socks_state)
+ socks_state_free(conn->socks_state);
+ if (conn->buffer)
+ bufferevent_free(conn->buffer);
+ proto_conn_free(conn);
+
+ /* If this was the last connection AND we are shutting down,
+ finish shutdown. */
+ if (shutting_down && (!connections || smartlist_len(connections) == 0)) {
+ if (connections)
+ smartlist_free(connections);
+ finish_shutdown();
+ }
}
}
@@ -387,25 +362,51 @@ close_conn(conn_t *conn)
empty.
*/
static void
-close_conn_on_flush(struct bufferevent *bev, void *arg)
+conn_free_on_flush(struct bufferevent *bev, void *arg)
{
conn_t *conn = arg;
log_debug("%s for %s", __func__, conn->peername);
if (evbuffer_get_length(bufferevent_get_output(bev)) == 0)
- close_conn(conn);
+ conn_free(conn);
+}
+
+static int
+circuit_create(conn_t *up, conn_t *down)
+{
+ if (!up || !down)
+ return -1;
+
+ circuit_t *r = xzalloc(sizeof(circuit_t));
+ r->upstream = up;
+ r->downstream = down;
+ up->circuit = r;
+ down->circuit = r;
+ return 0;
+}
+
+static void
+circuit_free(circuit_t *circuit)
+{
+ /* break the circular references before deallocating each side */
+ circuit->upstream->circuit = NULL;
+ circuit->downstream->circuit = NULL;
+ conn_free(circuit->upstream);
+ conn_free(circuit->downstream);
+ free(circuit);
}
/**
- Make the outbound socket for a connection.
+ Make the outbound connection for a circuit.
*/
-static struct bufferevent *
-open_outbound_socket(conn_t *conn, struct event_base *base,
- bufferevent_data_cb readcb)
+static conn_t *
+open_outbound(conn_t *conn, bufferevent_data_cb readcb)
{
struct evutil_addrinfo *addr = config_get_target_addr(conn->cfg);
+ struct event_base *base = bufferevent_get_base(conn->buffer);
struct bufferevent *buf;
char *peername;
+ conn_t *newconn;
if (!addr) {
log_warn("%s: no target addresses available", conn->peername);
@@ -418,18 +419,23 @@ open_outbound_socket(conn_t *conn, struct event_base *base,
return NULL;
}
- bufferevent_setcb(buf, readcb, NULL, pending_conn_cb, conn);
+ newconn = proto_conn_create(conn->cfg);
+ if (!conn) {
+ log_warn("%s: failed to allocate state for outbound connection",
+ conn->peername);
+ bufferevent_free(buf);
+ return NULL;
+ }
+
+ newconn->buffer = buf;
+ bufferevent_setcb(buf, readcb, NULL, pending_conn_cb, newconn);
do {
peername = printable_address(addr->ai_addr, addr->ai_addrlen);
log_info("%s (%s): trying to connect to %s",
conn->peername, conn->cfg->vtable->name, peername);
- if (bufferevent_socket_connect(buf, addr->ai_addr, addr->ai_addrlen) >= 0) {
- /* success */
- bufferevent_enable(buf, EV_READ|EV_WRITE);
- free(peername);
- return buf;
- }
+ if (bufferevent_socket_connect(buf, addr->ai_addr, addr->ai_addrlen) >= 0)
+ goto success;
log_info("%s: connection to %s failed: %s",
conn->peername, peername,
evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
@@ -440,8 +446,55 @@ open_outbound_socket(conn_t *conn, struct event_base *base,
log_warn("%s: all outbound connection attempts failed",
conn->peername);
- bufferevent_free(buf);
+ conn_free(newconn);
return NULL;
+
+ success:
+ bufferevent_enable(buf, EV_READ|EV_WRITE);
+ newconn->peername = peername;
+ obfs_assert(connections);
+ smartlist_add(connections, newconn);
+ return newconn;
+}
+
+/**
+ As open_outbound, but uses bufferevent_socket_connect_hostname
+ rather than bufferevent_socket_connect.
+*/
+static conn_t *
+open_outbound_hostname(conn_t *conn, int af, const char *addr, int port)
+{
+ struct event_base *base = bufferevent_get_base(conn->buffer);
+ struct bufferevent *buf;
+ conn_t *newconn;
+
+ buf = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
+ if (!buf) {
+ log_warn("%s: unable to create outbound socket buffer", conn->peername);
+ return NULL;
+ }
+ newconn = proto_conn_create(conn->cfg);
+ if (!conn) {
+ log_warn("%s: failed to allocate state for outbound connection",
+ conn->peername);
+ bufferevent_free(buf);
+ return NULL;
+ }
+ newconn->buffer = buf;
+ bufferevent_setcb(buf, downstream_read_cb, NULL, pending_socks_cb, newconn);
+ if (bufferevent_socket_connect_hostname(buf, get_evdns_base(),
+ af, addr, port) < 0) {
+ log_warn("%s: outbound connection to %s:%d failed: %s",
+ conn->peername, addr, port,
+ evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
+ conn_free(newconn);
+ return NULL;
+ }
+
+ bufferevent_enable(buf, EV_READ|EV_WRITE);
+ obfs_assert(connections);
+ smartlist_add(connections, newconn);
+ return newconn;
}
/**
@@ -452,9 +505,7 @@ socks_read_cb(struct bufferevent *bev, void *arg)
{
conn_t *conn = arg;
enum socks_ret socks_ret;
- log_debug("%s for %s", __func__, conn->peername);
- /* socks only makes sense on the upstream side */
- obfs_assert(bev == conn->upstream);
+ log_debug("%s: %s", conn->peername, __func__);
do {
enum socks_status_t status = socks_state_get_status(conn->socks_state);
@@ -466,27 +517,16 @@ socks_read_cb(struct bufferevent *bev, void *arg)
const char *addr=NULL;
r = socks_state_get_address(conn->socks_state, &af, &addr, &port);
obfs_assert(r==0);
- conn->downstream =
- bufferevent_socket_new(bufferevent_get_base(conn->upstream),
- -1, BEV_OPT_CLOSE_ON_FREE);
-
- bufferevent_setcb(conn->downstream,
- downstream_read_cb, NULL, pending_socks_cb, conn);
-
- r = bufferevent_socket_connect_hostname(conn->downstream,
- get_evdns_base(),
- af, addr, port);
- bufferevent_enable(conn->downstream, EV_READ|EV_WRITE);
- log_debug("socket_connect_hostname said %d! (%s,%d)", r, addr, port);
-
- if (r < 0) {
+ log_info("%s: socks: trying to connect to %s:%u",
+ conn->peername, addr, port);
+ if (circuit_create(conn, open_outbound_hostname(conn, af, addr, port))) {
/* XXXX send socks reply */
- close_conn(conn);
+ conn_free(conn);
return;
}
/* further upstream data will be processed once the downstream
side is established */
- bufferevent_disable(conn->upstream, EV_READ|EV_WRITE);
+ bufferevent_disable(conn->buffer, EV_READ|EV_WRITE);
return;
}
@@ -498,14 +538,14 @@ socks_read_cb(struct bufferevent *bev, void *arg)
if (socks_ret == SOCKS_INCOMPLETE)
return; /* need to read more data. */
else if (socks_ret == SOCKS_BROKEN)
- close_conn(conn); /* XXXX send socks reply */
+ conn_free(conn); /* XXXX send socks reply */
else if (socks_ret == SOCKS_CMD_NOT_CONNECT) {
bufferevent_enable(bev, EV_WRITE);
bufferevent_disable(bev, EV_READ);
socks5_send_reply(bufferevent_get_output(bev), conn->socks_state,
SOCKS5_FAILED_UNSUPPORTED);
bufferevent_setcb(bev, NULL,
- close_conn_on_flush, flush_error_cb, conn);
+ conn_free_on_flush, flush_error_cb, conn);
return;
}
}
@@ -518,17 +558,27 @@ socks_read_cb(struct bufferevent *bev, void *arg)
static void
upstream_read_cb(struct bufferevent *bev, void *arg)
{
- conn_t *conn = arg;
- log_debug("%s: %s, %lu bytes available", conn->peername, __func__,
+ conn_t *up = arg;
+ conn_t *down;
+ log_debug("%s: %s, %lu bytes available", up->peername, __func__,
(unsigned long)evbuffer_get_length(bufferevent_get_input(bev)));
- obfs_assert(bev == conn->upstream);
- if (proto_send(conn,
- bufferevent_get_input(conn->upstream),
- bufferevent_get_output(conn->downstream)) < 0) {
- log_debug("%s: Error during transmit.", conn->peername);
- close_conn(conn);
+ obfs_assert(up->buffer == bev);
+ obfs_assert(!up->flushing);
+ obfs_assert(up->is_open);
+ obfs_assert(up->circuit);
+ obfs_assert(up->circuit->downstream);
+
+ down = up->circuit->downstream;
+ if (proto_send(up,
+ bufferevent_get_input(up->buffer),
+ bufferevent_get_output(down->buffer))) {
+ log_debug("%s: error during transmit.", up->peername);
+ conn_free(up);
}
+ log_debug("%s: transmitted %lu bytes", down->peername,
+ (unsigned long)
+ evbuffer_get_length(bufferevent_get_output(down->buffer)));
}
/**
@@ -539,28 +589,45 @@ upstream_read_cb(struct bufferevent *bev, void *arg)
static void
downstream_read_cb(struct bufferevent *bev, void *arg)
{
- conn_t *conn = arg;
+ conn_t *down = arg;
+ conn_t *up;
enum recv_ret r;
- log_debug("%s: %s, %lu bytes available", conn->peername, __func__,
+
+ log_debug("%s: %s, %lu bytes available", down->peername, __func__,
(unsigned long)evbuffer_get_length(bufferevent_get_input(bev)));
- obfs_assert(bev == conn->downstream);
- r = proto_recv(conn,
- bufferevent_get_input(conn->downstream),
- bufferevent_get_output(conn->upstream));
+ obfs_assert(down->buffer == bev);
+ obfs_assert(!down->flushing);
+ obfs_assert(down->is_open);
+ obfs_assert(down->circuit);
+ obfs_assert(down->circuit->upstream);
+ up = down->circuit->upstream;
+
+
+ r = proto_recv(down,
+ bufferevent_get_input(down->buffer),
+ bufferevent_get_output(up->buffer));
if (r == RECV_BAD) {
- log_debug("%s: Error during receive.", conn->peername);
- close_conn(conn);
- } else if (r == RECV_SEND_PENDING) {
- log_debug("%s: Reply of %lu bytes", conn->peername,
+ log_debug("%s: error during receive.", down->peername);
+ conn_free(down);
+ } else {
+ log_debug("%s: forwarded %lu bytes", down->peername,
(unsigned long)
- evbuffer_get_length(bufferevent_get_input(conn->upstream)));
- if (proto_send(conn,
- bufferevent_get_input(conn->upstream),
- bufferevent_get_output(conn->downstream)) < 0) {
- log_debug("%s: Error during reply.", conn->peername);
- close_conn(conn);
+ evbuffer_get_length(bufferevent_get_output(up->buffer)));
+ if (r == RECV_SEND_PENDING) {
+ log_debug("%s: reply of %lu bytes", down->peername,
+ (unsigned long)
+ evbuffer_get_length(bufferevent_get_input(up->buffer)));
+ if (proto_send(up,
+ bufferevent_get_input(up->buffer),
+ bufferevent_get_output(down->buffer)) < 0) {
+ log_debug("%s: error during reply.", down->peername);
+ conn_free(down);
+ }
+ log_debug("%s: transmitted %lu bytes", down->peername,
+ (unsigned long)
+ evbuffer_get_length(bufferevent_get_output(down->buffer)));
}
}
}
@@ -570,22 +637,29 @@ downstream_read_cb(struct bufferevent *bev, void *arg)
We prepare the connection to be closed ASAP.
*/
static void
-error_or_eof(conn_t *conn, struct bufferevent *bev_err)
+error_or_eof(conn_t *conn)
{
+ circuit_t *circ = conn->circuit;
+ struct bufferevent *bev_err = conn->buffer;
struct bufferevent *bev_flush;
- log_debug("%s for %s", __func__, conn->peername);
- if (bev_err == conn->upstream) bev_flush = conn->downstream;
- else if (bev_err == conn->downstream) bev_flush = conn->upstream;
- else obfs_abort();
+ log_debug("%s for %s", __func__, conn->peername);
+ if (!circ || conn->flushing || !conn->is_open) {
+ conn_free(conn);
+ return;
+ }
- if (conn->flushing || !conn->is_open ||
- evbuffer_get_length(bufferevent_get_output(bev_flush)) == 0) {
- close_conn(conn);
+ bev_flush = (conn == circ->upstream) ? circ->downstream->buffer
+ : circ->upstream->buffer;
+ if (evbuffer_get_length(bufferevent_get_output(bev_flush)) == 0) {
+ conn_free(conn);
return;
}
- conn->flushing = 1;
+ /* XXX move ->flushing and ->is_open to circuit_t */
+ circ->upstream->flushing = 1;
+ circ->downstream->flushing = 1;
+
/* Stop reading and writing; wait for the other side to flush if it has
* data. */
bufferevent_disable(bev_err, EV_READ|EV_WRITE);
@@ -595,7 +669,7 @@ error_or_eof(conn_t *conn, struct bufferevent *bev_err)
official API to retrieve the callback functions and/or change
just one callback while leaving the others intact. */
bufferevent_setcb(bev_flush, bev_flush->readcb,
- close_conn_on_flush, flush_error_cb, conn);
+ conn_free_on_flush, flush_error_cb, conn);
}
/**
@@ -607,7 +681,7 @@ error_cb(struct bufferevent *bev, short what, void *arg)
{
conn_t *conn = arg;
int errcode = EVUTIL_SOCKET_ERROR();
- log_debug("%s for %s: what=%x err=%d", __func__, conn->peername,
+ log_debug("%s for %s: what=0x%04x errno=%d", __func__, conn->peername,
what, errcode);
/* It should be impossible to get here with BEV_EVENT_CONNECTED. */
@@ -615,21 +689,16 @@ error_cb(struct bufferevent *bev, short what, void *arg)
obfs_assert(!(what & BEV_EVENT_CONNECTED));
if (what & BEV_EVENT_ERROR) {
- log_warn("Error on %s side of connection from %s: %s",
- bev == conn->upstream ? "upstream" : "downstream",
+ log_warn("Error talking to %s: %s",
conn->peername,
evutil_socket_error_to_string(errcode));
} else if (what & BEV_EVENT_EOF) {
- log_info("EOF on %s side of connection from %s",
- bev == conn->upstream ? "upstream" : "downstream",
- conn->peername);
+ log_info("EOF from %s", conn->peername);
} else {
obfs_assert(what & BEV_EVENT_TIMEOUT);
- log_info("Timeout on %s side of connection from %s",
- bev == conn->upstream ? "upstream" : "downstream",
- conn->peername);
+ log_info("Timeout talking to %s", conn->peername);
}
- error_or_eof(arg, bev);
+ error_or_eof(conn);
}
/**
@@ -641,7 +710,7 @@ flush_error_cb(struct bufferevent *bev, short what, void *arg)
{
conn_t *conn = arg;
int errcode = EVUTIL_SOCKET_ERROR();
- log_debug("%s for %s: what=%x err=%d", __func__, conn->peername,
+ log_debug("%s for %s: what=0x%04x errno=%d", __func__, conn->peername,
what, errcode);
/* It should be impossible to get here with BEV_EVENT_CONNECTED. */
@@ -650,11 +719,10 @@ flush_error_cb(struct bufferevent *bev, short what, void *arg)
obfs_assert(conn->flushing);
- log_warn("Error during flush of %s side of connection from %s: %s",
- bev == conn->upstream ? "upstream" : "downstream",
+ log_warn("Error during flush of connection with %s: %s",
conn->peername,
evutil_socket_error_to_string(errcode));
- close_conn(conn);
+ conn_free(conn);
return;
}
@@ -668,35 +736,36 @@ static void
pending_conn_cb(struct bufferevent *bev, short what, void *arg)
{
conn_t *conn = arg;
- struct bufferevent *other;
log_debug("%s: %s", conn->peername, __func__);
- if (bev == conn->upstream) other = conn->downstream;
- else if (bev == conn->downstream) other = conn->upstream;
- else obfs_abort();
-
- /* Upon successful connection, enable traffic on the other side,
- and replace this callback with the regular error_cb */
+ /* Upon successful connection, enable traffic on both sides of the
+ connection, and replace this callback with the regular error_cb */
if (what & BEV_EVENT_CONNECTED) {
- obfs_assert(!conn->flushing);
+ circuit_t *circ = conn->circuit;
+ obfs_assert(circ);
+ obfs_assert(!circ->upstream->flushing);
+ obfs_assert(!circ->downstream->flushing);
+
+ circ->upstream->is_open = 1;
+ circ->downstream->is_open = 1;
- conn->is_open = 1;
- log_debug("%s: Successful %s connection", conn->peername,
- bev == conn->upstream ? "upstream" : "downstream");
+ log_debug("%s: Successful connection", conn->peername);
/* Queue handshake, if any. */
- if (proto_handshake(conn,
- bufferevent_get_output(conn->downstream))<0) {
+ if (proto_handshake(circ->downstream,
+ bufferevent_get_output(circ->downstream->buffer))<0) {
log_debug("%s: Error during handshake", conn->peername);
- close_conn(conn);
+ conn_free(conn);
return;
}
- /* XXX Dirty access to bufferevent guts. There appears to be no
- official API to retrieve the callback functions and/or change
- just one callback while leaving the others intact. */
- bufferevent_setcb(bev, bev->readcb, bev->writecb, error_cb, conn);
- bufferevent_enable(other, EV_READ|EV_WRITE);
+ bufferevent_setcb(circ->upstream->buffer,
+ upstream_read_cb, NULL, error_cb, circ->upstream);
+ bufferevent_setcb(circ->downstream->buffer,
+ downstream_read_cb, NULL, error_cb, circ->downstream);
+
+ bufferevent_enable(circ->upstream->buffer, EV_READ|EV_WRITE);
+ bufferevent_enable(circ->downstream->buffer, EV_READ|EV_WRITE);
return;
}
@@ -712,10 +781,20 @@ pending_conn_cb(struct bufferevent *bev, short what, void *arg)
static void
pending_socks_cb(struct bufferevent *bev, short what, void *arg)
{
- conn_t *conn = arg;
- log_debug("%s: %s", conn->peername, __func__);
- obfs_assert(bev == conn->downstream);
- obfs_assert(conn->socks_state);
+ conn_t *down = arg;
+ circuit_t *circ = down->circuit;
+ conn_t *up;
+ socks_state_t *socks;
+
+ log_debug("%s: %s", down->peername, __func__);
+
+ obfs_assert(circ);
+ obfs_assert(circ->upstream);
+ obfs_assert(circ->upstream->socks_state);
+ obfs_assert(circ->downstream == down);
+
+ up = circ->upstream;
+ socks = circ->upstream->socks_state;
/* If we got an error while in the ST_HAVE_ADDR state, chances are
that we failed connecting to the host requested by the CONNECT
@@ -725,14 +804,11 @@ pending_socks_cb(struct bufferevent *bev, short what, void *arg)
errno isn't meaningful in that case... */
if ((what & (BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT))) {
int err = EVUTIL_SOCKET_ERROR();
- log_warn("Connection error: %s",
- evutil_socket_error_to_string(err));
- if (socks_state_get_status(conn->socks_state) == ST_HAVE_ADDR) {
- socks_send_reply(conn->socks_state,
- bufferevent_get_output(conn->upstream),
- err);
+ log_warn("Connection error: %s", evutil_socket_error_to_string(err));
+ if (socks_state_get_status(socks) == ST_HAVE_ADDR) {
+ socks_send_reply(socks, bufferevent_get_output(up->buffer), err);
}
- error_or_eof(conn, bev);
+ error_or_eof(down);
return;
}
@@ -744,39 +820,39 @@ pending_socks_cb(struct bufferevent *bev, short what, void *arg)
struct sockaddr *sa = (struct sockaddr*)&ss;
socklen_t slen = sizeof(&ss);
- obfs_assert(!conn->flushing);
+ obfs_assert(!up->flushing);
+ obfs_assert(!down->flushing);
+ /* Figure out where we actually connected to, and tell the socks client */
if (getpeername(bufferevent_getfd(bev), sa, &slen) == 0) {
- /* Figure out where we actually connected to so that we can tell the
- * socks client */
- socks_state_set_address(conn->socks_state, sa);
+ socks_state_set_address(socks, sa);
+ if (!down->peername)
+ down->peername = printable_address(sa, slen);
}
- socks_send_reply(conn->socks_state,
- bufferevent_get_output(conn->upstream), 0);
+ socks_send_reply(socks, bufferevent_get_output(up->buffer), 0);
/* Switch to regular upstream behavior. */
- socks_state_free(conn->socks_state);
- conn->socks_state = NULL;
- conn->is_open = 1;
- log_debug("%s: Successful %s connection", conn->peername,
- bev == conn->upstream ? "upstream" : "downstream");
-
- bufferevent_setcb(conn->upstream,
- upstream_read_cb, NULL, error_cb, conn);
- bufferevent_setcb(conn->downstream,
- downstream_read_cb, NULL, error_cb, conn);
- bufferevent_enable(conn->upstream, EV_READ|EV_WRITE);
+ socks_state_free(socks);
+ up->socks_state = NULL;
+ up->is_open = 1;
+ down->is_open = 1;
+ log_debug("%s: Successful outbound connection to %s",
+ up->peername, down->peername);
+
+ bufferevent_setcb(up->buffer, upstream_read_cb, NULL, error_cb, up);
+ bufferevent_setcb(down->buffer, downstream_read_cb, NULL, error_cb, down);
+ bufferevent_enable(up->buffer, EV_READ|EV_WRITE);
+ bufferevent_enable(down->buffer, EV_READ|EV_WRITE);
/* Queue handshake, if any. */
- if (proto_handshake(conn,
- bufferevent_get_output(conn->downstream))<0) {
- log_debug("%s: Error during handshake", conn->peername);
- close_conn(conn);
+ if (proto_handshake(down, bufferevent_get_output(down->buffer))) {
+ log_debug("%s: Error during handshake", down->peername);
+ conn_free(down);
return;
}
- if (evbuffer_get_length(bufferevent_get_input(conn->upstream)) != 0)
- upstream_read_cb(conn->upstream, conn);
+ if (evbuffer_get_length(bufferevent_get_input(up->buffer)) > 0)
+ upstream_read_cb(up->buffer, up);
return;
}
diff --git a/src/network.h b/src/network.h
index 8fb41ef..72df32c 100644
--- a/src/network.h
+++ b/src/network.h
@@ -12,23 +12,37 @@ void close_all_listeners(void);
void start_shutdown(int barbaric);
/**
- This struct defines the state of a connection between "upstream"
- and "downstream" peers (it's really two connections at the socket
- level). Again, each protocol may extend this structure with
- additional private data by embedding it as the first member of a
- larger structure. The protocol's conn_create() method is responsible
- only for filling in the |vtable| field of this structure, plus any
- private data of course.
+ This struct defines the state of one socket-level connection. Each
+ protocol may extend this structure with additional private data by
+ embedding it as the first member of a larger structure. The
+ protocol's conn_create() method is responsible only for filling in
+ the |cfg| and |mode| fields of this structure, plus any private
+ data of course.
+
+ An incoming connection is not associated with a circuit until the
+ destination for the other side of the circuit is known. An outgoing
+ connection is associated with a circuit from its creation.
*/
struct conn_t {
config_t *cfg;
char *peername;
socks_state_t *socks_state;
- struct bufferevent *upstream;
- struct bufferevent *downstream;
+ circuit_t *circuit;
+ struct bufferevent *buffer;
enum listen_mode mode : 30;
unsigned int flushing : 1;
unsigned int is_open : 1;
};
+/**
+ This struct defines a pair of established connections. The "upstream"
+ connection is to the higher-level client or server that we are proxying
+ traffic for. The "downstream" connection is to the remote peer.
+ */
+
+struct circuit_t {
+ conn_t *upstream;
+ conn_t *downstream;
+};
+
#endif
diff --git a/src/util.h b/src/util.h
index 8228790..2b37c48 100644
--- a/src/util.h
+++ b/src/util.h
@@ -62,6 +62,7 @@ unsigned int ui64_log2(uint64_t u64);
/***** Network types and functions. *****/
+typedef struct circuit_t circuit_t;
typedef struct config_t config_t;
typedef struct conn_t conn_t;
typedef struct protocol_vtable protocol_vtable;
More information about the tor-commits
mailing list