[or-cvs] r10512: Commit for 6th June. Coding complete todo: compile/bugfix/te (in libevent-urz/trunk: . doc)
Urz at seul.org
Urz at seul.org
Wed Jun 6 11:31:04 UTC 2007
Author: Urz
Date: 2007-06-06 07:31:04 -0400 (Wed, 06 Jun 2007)
New Revision: 10512
Modified:
libevent-urz/trunk/Makefile.am
libevent-urz/trunk/buffer.c
libevent-urz/trunk/doc/plan.txt
libevent-urz/trunk/event-internal.h
libevent-urz/trunk/event.h
libevent-urz/trunk/sa_evbuffer.c
Log:
Commit for 6th June.
Coding complete
todo: compile/bugfix/testcode
Modified: libevent-urz/trunk/Makefile.am
===================================================================
--- libevent-urz/trunk/Makefile.am 2007-06-06 04:51:33 UTC (rev 10511)
+++ libevent-urz/trunk/Makefile.am 2007-06-06 11:31:04 UTC (rev 10512)
@@ -39,7 +39,7 @@
endif
libevent_la_SOURCES = event.c buffer.c evbuffer.c log.c event_tagging.c \
- http.c evhttp.h http-internal.h evdns.c evdns.h strlcpy.c \
+ sa_evbuffer.c http.c evhttp.h http-internal.h evdns.c evdns.h strlcpy.c \
strlcpy-internal.h strlcpy-internal.h \
$(SYS_SRC)
libevent_la_LIBADD = @LTLIBOBJS@ $(SYS_LIBS)
Modified: libevent-urz/trunk/buffer.c
===================================================================
--- libevent-urz/trunk/buffer.c 2007-06-06 04:51:33 UTC (rev 10511)
+++ libevent-urz/trunk/buffer.c 2007-06-06 11:31:04 UTC (rev 10512)
@@ -60,7 +60,7 @@
#endif
#include "event.h"
-#include "event_internal.h"
+#include "event-internal.h"
void evbuffer_lock(struct evbuffer *buffer)
{
@@ -328,16 +328,12 @@
evbuffer_align(struct evbuffer *buf)
{
evbuffer_lock(buf);
- evbuffer_align(buf);
- evbuffer_unlock(buf);
-}
-
-static inline void
-evbuffer_align(struct evbuffer *buf)
-{
+
memmove(buf->orig_buffer, buf->buffer, buf->off);
buf->buffer = buf->orig_buffer;
buf->misalign = 0;
+
+ evbuffer_unlock(buf);
}
/* Expands the available space in the event buffer to at least datlen */
@@ -601,4 +597,4 @@
buffer->cbarg = cbarg;
evbuffer_unlock(buffer);
-}
\ No newline at end of file
+}
Modified: libevent-urz/trunk/doc/plan.txt
===================================================================
--- libevent-urz/trunk/doc/plan.txt 2007-06-06 04:51:33 UTC (rev 10511)
+++ libevent-urz/trunk/doc/plan.txt 2007-06-06 11:31:04 UTC (rev 10512)
@@ -33,32 +33,22 @@
to set the bufferevent flags to let the main thread know callbacks are ready, and to call
a function to ensure the main event-loop is notified of events.
- Mutex work is done.
- - Notification function written, but not yet used.
- - bufferevent pending callback flags not yet set.
+ - Notification function written, used as needed.
+ - bufferevent pending callback flags set as needed.
Write a short read event handler for the event_loop half of the socketpair which
simply reads all there is to be read (no blocking!) and calls the real read/write callbacks
depending on what it reads.
- Done
-
-Functions yet to modify:
-int sa_bufferevent_base_set(struct event_base *base, struct bufferevent *bufev);
-int sa_bufferevent_priority_set(struct bufferevent *bufev, int pri);
-void sa_bufferevent_free(struct bufferevent *bufev);
-int sa_bufferevent_write(struct bufferevent *bufev, void *data, size_t size);
-int sa_bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);
-size_t sa_bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
-int sa_bufferevent_enable(struct bufferevent *bufev, short event);
-int sa_bufferevent_disable(struct bufferevent *bufev, short event);
-void sa_bufferevent_settimeout(struct bufferevent *bufev,
- int timeout_read, int timeout_write);
TODOs for tomorrow:
-Modify functions above where needed.
Write regression tests.
Compile and Test.
Write sample code
+Talk to nickm, re:
+ - Timeouts
+ - Automake
Testing:
Add test cases to libevent testing code... test/regress.c I believe
Modified: libevent-urz/trunk/event-internal.h
===================================================================
--- libevent-urz/trunk/event-internal.h 2007-06-06 04:51:33 UTC (rev 10511)
+++ libevent-urz/trunk/event-internal.h 2007-06-06 11:31:04 UTC (rev 10512)
@@ -68,7 +68,7 @@
#define FD_CLOSEONEXEC(x)
#endif
-LIST_HEAD(sa_evbuf_list_elem, struct sa_bufferevent) bufev_list_head;
+LIST_HEAD(sa_evbuf_list_elem, sa_bufferevent) bufev_list_head;
#ifdef __cplusplus
}
Modified: libevent-urz/trunk/event.h
===================================================================
--- libevent-urz/trunk/event.h 2007-06-06 04:51:33 UTC (rev 10511)
+++ libevent-urz/trunk/event.h 2007-06-06 11:31:04 UTC (rev 10512)
@@ -31,6 +31,7 @@
extern "C" {
#endif
+#include "compat/sys/queue.h"
#include <stdarg.h>
#ifdef WIN32
@@ -222,7 +223,7 @@
void *cbarg;
#ifdef WIN32
- HANDLE lock
+ HANDLE lock;
#endif
};
@@ -306,7 +307,7 @@
u_char del_read_event_set;
u_char del_write_event_set;
- LIST_ENTRY(struct sa_bufferevent) list_elem;
+ LIST_ENTRY(sa_bufferevent) list_elem;
};
struct bufferevent *sa_bufferevent_new(evbuffercb readcb,
Modified: libevent-urz/trunk/sa_evbuffer.c
===================================================================
--- libevent-urz/trunk/sa_evbuffer.c 2007-06-06 04:51:33 UTC (rev 10511)
+++ libevent-urz/trunk/sa_evbuffer.c 2007-06-06 11:31:04 UTC (rev 10512)
@@ -83,9 +83,11 @@
for (np = bufev_list_head.lh_first; np != NULL; np = np->list_elem.le_next) {
if(np->del_read_event_set) {
(*np->readcb)(np, np->cbarg);
+ np->del_read_event_set = 0;
}
if(np->del_write_event_set) {
(*np->writecb)(np, np->cbarg);
+ np->del_write_event_set = 0;
}
}
}
@@ -146,147 +148,96 @@
return (event_add(ev, ptv));
}
-/*
- * This callback is executed when the size of the input buffer changes.
- * We use it to apply back pressure on the reading side.
+/*
+ * Used by a 'loader' thread to add data to a sa_bufferevent.
+ * The loader thread reads/makes up/produces data, and adds it
+ * to the sa_bufferevent with sa_bufferevent_load. The return
+ * value is the amount of data copied into the sa_bufferevent.
+ * If an error occurs, 0 will be returned to indicate that no
+ * data was copied.
*/
-void
-sa_bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
- void *arg) {
- struct sa_bufferevent *bufev = arg;
- /*
- * If we are below the watermark then reschedule reading if it's
- * still enabled.
- */
- if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
- evbuffer_setcb(buf, NULL, NULL);
-
- if (bufev->enabled & EV_READ)
- sa_bufferevent_add(&bufev->ev_read, bufev->timeout_read);
- }
-}
-
-static void
-sa_bufferevent_readcb(int fd, short event, void *arg)
+size_t
+sa_bufferevent_load(struct sa_bufferevent *bufev, void *data, size_t size)
{
- struct sa_bufferevent *bufev = arg;
int res = 0;
short what = EVBUFFER_READ;
size_t len;
- int howmuch = -1;
+ size_t toload;
- if (event == EV_TIMEOUT) {
- what |= EVBUFFER_TIMEOUT;
- goto error;
- }
-
/*
* If we have a high watermark configured then we don't want to
* read more data than would make us reach the watermark.
*/
- if (bufev->wm_read.high != 0)
- howmuch = bufev->wm_read.high;
+ if (bufev->wm_read.high != 0 && ((size_t) bufev->wm_read.high < size)) {
+ toload = (size_t) bufev->wm_read.high;
+ } else {
+ toload = size;
+ }
- res = evbuffer_read(bufev->input, fd, howmuch);
+ res = evbuffer_add(bufev->input, data, toload);
if (res == -1) {
- if (errno == EAGAIN || errno == EINTR)
- goto reschedule;
- /* error case */
- what |= EVBUFFER_ERROR;
- } else if (res == 0) {
- /* eof case */
- what |= EVBUFFER_EOF;
+ return 0;
}
- if (res <= 0)
- goto error;
+ /*
+ * Since we do not directly control the load/unloading
+ * we cannot request more/less data be read/writen.
+ * (This comment will only make sense if you view this
+ * function side by side with bufferevent_readcb, from
+ * evbuffer.c which is where this function originated from)
+ */
- sa_bufferevent_add(&bufev->ev_read, bufev->timeout_read);
-
- /* See if this callbacks meets the water marks */
- len = EVBUFFER_LENGTH(bufev->input);
- if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
- return;
- if (bufev->wm_read.high != 0 && len > bufev->wm_read.high) {
- struct evbuffer *buf = bufev->input;
- event_del(&bufev->ev_read);
-
- /* Now schedule a callback for us */
- evbuffer_setcb(buf, sa_bufferevent_read_pressure_cb, bufev);
- return;
- }
-
/* Invoke the user callback - must always be called last */
- if (bufev->readcb != NULL)
- (*bufev->readcb)(bufev, bufev->cbarg);
- return;
+ if (bufev->readcb != NULL) {
+ bufev->del_read_event_set = 1;
+ notify();
+ }
+ return toload;
- reschedule:
- sa_bufferevent_add(&bufev->ev_read, bufev->timeout_read);
- return;
-
- error:
- (*bufev->errorcb)(bufev, what, bufev->cbarg);
}
-static void
-sa_bufferevent_writecb(int fd, short event, void *arg)
+/*
+ * Used by an 'unloader' thread to pull data from a sa_bufferevent.
+ * The unloader thread writes/processes/forwards the data, and removes it
+ * from the sa_bufferevent with sa_bufferevent_unload. The return
+ * value is the amount of data copied into the provided buffer.
+ * If an error occurs, 0 will be returned to indicate that no
+ * data was copied.
+ * The amount of data copied will always be <= the size value supplied.
+ */
+
+size_t
+sa_bufferevent_unload(struct sa_bufferevent *bufev, void *data, size_t size)
{
- struct sa_bufferevent *bufev = arg;
int res = 0;
- short what = EVBUFFER_WRITE;
+ size_t copysize;
+
+ evbuffer_lock(bufev->output);
+
+ if(EVBUFFER_LENGTH(bufev->output) < size) {
+ copysize = EVBUFFER_LENGTH(bufev->output);
+ } else {
+ copysize = size;
+ }
+
+ if(size == 0)
+ return 0;
- if (event == EV_TIMEOUT) {
- what |= EVBUFFER_TIMEOUT;
- goto error;
- }
+ memcpy(data, bufev->output->buffer, copysize);
+ evbuffer_drain(bufev->output, copysize);
- if (EVBUFFER_LENGTH(bufev->output)) {
- res = evbuffer_write(bufev->output, fd);
- if (res == -1) {
-#ifndef WIN32
-/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
- *set errno. thus this error checking is not portable*/
- if (errno == EAGAIN ||
- errno == EINTR ||
- errno == EINPROGRESS)
- goto reschedule;
- /* error case */
- what |= EVBUFFER_ERROR;
-
-#else
- goto reschedule;
-#endif
-
- } else if (res == 0) {
- /* eof case */
- what |= EVBUFFER_EOF;
- }
- if (res <= 0)
- goto error;
- }
-
- if (EVBUFFER_LENGTH(bufev->output) != 0)
- sa_bufferevent_add(&bufev->ev_write, bufev->timeout_write);
-
/*
* Invoke the user callback if our buffer is drained or below the
* low watermark.
*/
if (bufev->writecb != NULL &&
- EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
- (*bufev->writecb)(bufev, bufev->cbarg);
+ EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low) {
+ bufev->del_write_event_set = 1;
+ notify();
+ }
- return;
-
- reschedule:
- if (EVBUFFER_LENGTH(bufev->output) != 0)
- sa_bufferevent_add(&bufev->ev_write, bufev->timeout_write);
- return;
-
- error:
- (*bufev->errorcb)(bufev, what, bufev->cbarg);
+ return copysize;
}
/*
@@ -344,13 +295,18 @@
return (bufev);
}
+/*
+ * Again, one socketpair / event, so the priority can only be changed once.
+ * If we need it, I could probably implement prioritization within the
+ * sa_bufferevent callback, but that will still behave the same as
+ * a normal bufferevent.
+ */
+
int
-sa_bufferevent_priority_set(struct sa_bufferevent *bufev, int priority)
+sa_bufferevent_priority_set(int priority)
{
- if (event_priority_set(&bufev->ev_read, priority) == -1)
+ if (event_priority_set(&evbuffer_del_event, priority) == -1)
return (-1);
- if (event_priority_set(&bufev->ev_write, priority) == -1)
- return (-1);
return (0);
}
@@ -394,8 +350,10 @@
return (res);
/* If everything is okay, we need to schedule a write */
- if (size > 0 && (bufev->enabled & EV_WRITE))
- sa_bufferevent_add(&bufev->ev_write, bufev->timeout_write);
+ if (size > 0 && (bufev->enabled & EV_WRITE)) {
+ bufev->del_write_event_set = 1;
+ notify();
+ }
return (res);
}
@@ -405,9 +363,13 @@
{
int res;
+ evbuffer_lock(buf);
+
res = sa_bufferevent_write(bufev, buf->buffer, buf->off);
if (res != -1)
evbuffer_drain(buf, buf->off);
+
+ evbuffer_unlock(buf);
return (res);
}
@@ -416,6 +378,8 @@
sa_bufferevent_read(struct sa_bufferevent *bufev, void *data, size_t size)
{
struct evbuffer *buf = bufev->input;
+
+ evbuffer_lock(buf);
if (buf->off < size)
size = buf->off;
@@ -425,6 +389,8 @@
if (size)
evbuffer_drain(buf, size);
+
+ evbuffer_unlock(buf);
return (size);
}
@@ -432,15 +398,6 @@
int
sa_bufferevent_enable(struct sa_bufferevent *bufev, short event)
{
- if (event & EV_READ) {
- if (sa_bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
- return (-1);
- }
- if (event & EV_WRITE) {
- if (sa_bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
- return (-1);
- }
-
bufev->enabled |= event;
return (0);
}
@@ -448,15 +405,6 @@
int
sa_bufferevent_disable(struct sa_bufferevent *bufev, short event)
{
- if (event & EV_READ) {
- if (event_del(&bufev->ev_read) == -1)
- return (-1);
- }
- if (event & EV_WRITE) {
- if (event_del(&bufev->ev_write) == -1)
- return (-1);
- }
-
bufev->enabled &= ~event;
return (0);
}
@@ -495,15 +443,18 @@
0, EVBUFFER_LENGTH(bufev->input), bufev);
}
+/*
+ * Because we are using only one socketpair, and hence one event, we can only
+ * set the event_base for *all* sa_bufferevents at once. I don't know enough
+ * about what event_base(s) do, or how people use bufferevent_base_set to know
+ * if this is important or not
+ */
+
int
-sa_bufferevent_base_set(struct event_base *base, struct sa_bufferevent *bufev)
+sa_bufferevent_base_set(struct event_base *base)
{
int res;
- res = event_base_set(base, &bufev->ev_read);
- if (res == -1)
- return (res);
-
- res = event_base_set(base, &bufev->ev_write);
+ res = event_base_set(base, &evbuffer_del_event);
return (res);
}
More information about the tor-commits
mailing list