[or-cvs] r10771: Added initial code for writer and completed local establishm (libevent-urz/trunk/loaders)
Urz at seul.org
Urz at seul.org
Mon Jul 9 10:55:58 UTC 2007
Author: Urz
Date: 2007-07-09 06:55:57 -0400 (Mon, 09 Jul 2007)
New Revision: 10771
Modified:
libevent-urz/trunk/loaders/IOCPloader.c
libevent-urz/trunk/loaders/IOCPloader.h
Log:
Added initial code for writer and completed local establishment code.
Modified: libevent-urz/trunk/loaders/IOCPloader.c
===================================================================
--- libevent-urz/trunk/loaders/IOCPloader.c 2007-07-09 08:44:17 UTC (rev 10770)
+++ libevent-urz/trunk/loaders/IOCPloader.c 2007-07-09 10:55:57 UTC (rev 10771)
@@ -3,11 +3,12 @@
#include "../event.h"
#include "IOCPloader.h"
+#include "IOCPloader.h"
#define NO_WORKERS 2
HANDLE IOCP;
-HANDLE Threads[NO_WORKERS];
+HANDLE Threads[NO_WORKERS+1];
WSAEVENT *eventList = NULL;
connection *connList = NULL;
DWORD listSize = 0;
@@ -36,7 +37,7 @@
* initializing is partially done.
*/
-DWORD new_connection_obj(SOCKET *s, sa_bufferevent * bufevent) {
+DWORD IOCPloader_bind(SOCKET *s, sa_bufferevent * bufevent) {
DWORD myListElem;
// gain global list lock
@@ -54,17 +55,33 @@
// get connection lock here
connList[myListElem].localbuf = bufevent;
- connList[myListElem].netbuf = malloc(sizeof(WSABUF));
- connList[myListElem].netbuf->len = SUGGESTED_BUF_SIZE;
- connList[myListElem].netbuf->buf = malloc(SUGGESTED_BUF_SIZE);
+ connList[myListElem].recvbuf = malloc(sizeof(WSABUF));
+ connList[myListElem].recvbuf->len = SUGGESTED_BUF_SIZE;
+ connList[myListElem].recvbuf->buf = malloc(SUGGESTED_BUF_SIZE);
+ connList[myListElem].sendbuf = malloc(sizeof(WSABUF));
+ connList[myListElem].sendbuf->len = SUGGESTED_BUF_SIZE;
+ connList[myListElem].sendbuf->buf = malloc(SUGGESTED_BUF_SIZE);
+
connList[myListElem].sock = s;
- connList[myListElem].ol = calloc(sizeof(WSAOVERLAPPED));
- connList[myListElem].ol->hEvent = (HANDLE) eventList[myListElem];
+ connList[myListElem].recvol = calloc(sizeof(OLOPERATION));
+ connList[myListElem].recvol.op = OP_RECV;
+ connList[myListElem].recvol.connIndex = myListElem;
+ connList[myListElem].recvol.ol = calloc(sizeof(WSAOVERLAPPED));
+ connList[myListElem].recvol.ol->hEvent = (HANDLE) eventList[myListElem];
+
+
+ connList[myListElem].sendol = calloc(sizeof(OLOPERATION));
+ connList[myListElem].sendol.op = OP_SEND;
+ connList[myListElem].sendol.connIndex = myListElem;
+ connList[myListElem].sendol.ol = calloc(sizeof(WSAOVERLAPPED));
// free conneciton lock here
// relase global list lock
+
+ // Despite the name, this call associates the socket with the I/O Completion port
+ IOCP = CreateIoCompletionPort(connList[myListElem].sock, IOCP, (ULONG_PTR) myListElem, 0);
}
DWORD WINAPI IOCPLoaderMain(LPVOID nodata) {
@@ -89,85 +106,136 @@
}
// This concludes the code modified from the Startup function()
+ Threads[NO_WORKERS] = CreateThread(NULL, 0, &iocp_writer_thread, NULL, 0, NULL);
}
+UINT iocp_writer_thread(LPVOID pParam) {
+ DWORD listpos;
+ size_t unloaded;
+ DWORD WSASendFags = 0;
+ while(1) {
+ for(listpos = 0; listpos < listSize; listpos++) {
+ // lock connList[listpos]
+ if(connList[listpos].canSend) {
+ // grab data that needs to be sent
+ unloaded = sa_bufferevent_unload(connList[listpos].localbuf,
+ connList[listpos].sendbuf->buf, SUGGESTED_BUF_SIZE);
+ if(unloaded == 0) {
+ // no data ready for sending
+ // unlock connList[listpos]
+ continue;
+ }
+ // TODO: How does WSASend know the amount of available data? Does it
+ // assume the WSABUF is full?
+ // remind us that sending is in progress and we can't overwrite the buffer
+ connList[listpos].canSend = 0;
+ WSASend(
+ connList[listpos].sock,
+ // Socket to send on
+ connList[listpos].sendbuf,
+ // 'array' of WSA buffers to use
+ 1,
+ // array of size 1
+ NULL,
+ // this socket is overlapped, so can't retrieve sent size immeditately
+ &WSASendFlags,
+ // any send flags
+ &connList[listpos].sendol,
+ // overlapped struct for the operation.
+ NULL
+ // no completion routine
+ );
+ }
+ // unlock connList[listpos]
+ }
+ }
+}
+
UINT iocp_worker_thread(LPVOID pParam) {
- // From IOWorkerThreadProc
- LPOVERLAPPED CompletedOverlapped;
+ OLOPERATION CompletedOverlapped;
// See http://msdn2.microsoft.com/en-us/library/ms684342.aspx
- DWORD WaitRet;
- ULONG *CompletionKey;
+ DWORD CompleteSize;
+ DWORD CompletionKey;
BOOL GQCSRet
connection *Conn;
UINT WSARecvRet;
ULONG WSARecvFlags = MSG_WAITALL;
while(1) {
-
- WaitRet = WaitForMultipleObjects(
- listSize,
- // number of socket events in array
- eventList,
- // array of socket read event handles
- FALSE,
- // wait until any one event happens
+ Conn = NULL;
+
+ GQCSRet = GetQueuedCompletionStatus(
+ IOCP, // The IOCP to get statuses from
+ &CompleteSize, // "... receives the number of bytes transferred
+ //during an I/O operation that has completed."
+ (ULONG_PTR) &CompletionKey,
+ // A pointer to a variable that receives the completion key value
+ // associated with the file handle whose I/O operation has completed.
+ // In IOCPS.cpp : (LPDWORD) &lpClientContext,
+ &CompletedOverlapped
+ // A pointer to a variable that receives the address of the OVERLAPPED
+ // structure that was specified when the completed I/O operation was started.
INFINITE
- // wait forever
+ // The number of milliseconds that the caller is willing to wait for a
+ // completion packet to appear at the completion port.
);
-
- if(WaitRet > listSize) {
- // An error happened.
+ if(!GQCSRet) {
+ printf("GetQueuedCompletionStatus error %d\n", GetLastError());
+ continue;
}
+
// This is no error, a completed read event has occured.
- // Get the connection ths applies to. See semantics of
- // return value for WaitForMultipleObjects
- // (http://msdn2.microsoft.com/en-us/library/ms687025.aspx)
- // to understand why this works.
- Conn = connList + WaitRet;
+ // Get the connection ths applies to.
+ Conn = &connList[CompletionKey];
- // Now we have a completed read event, perform the operations
- // required on the read data.
- ReadComplete(Conn);
-
-
- // Now the data has been removed from the WSABUF, we can reset the
- // read to continue reading.
- // http://msdn2.microsoft.com/en-us/library/ms741688.aspx
- WSARecvRet = WSARecv(
- Conn->sock,
- // The socket to recieve from
- Conn->netbuf,
- // Pointer to an 'array' of WSABUFs.
- 1,
- // The 'array' is of size 1
- NULL,
- // This parameter would recieve the size in bytes of the
- // read. However, because we are using overlapped, it doesn't.
- &WSARecvFlags,
- // Flags which control the operation of WSARecv. I belive MSG_WAITALL
- // is the one we want.
- Conn->ol,
- // The overlapped structure for the event.
- NULL
- // The callback - we are using events for this, so that's not important
- );
-
+ // lock Conn
+ if(CompletedOverlapped.op == OP_SEND) {
+ // the connection has sent all data and is ready for more
+ Conn->canSend = 1;
+ } else if(CompletedOverlapped.op == OP_RECV) {
+ // Now we have a completed read event, perform the operations
+ // required on the read data.
+ ReadComplete(Conn, CompleteSize);
+
+ // Now the data has been removed from the WSABUF, we can reset the
+ // read to continue reading.
+ // http://msdn2.microsoft.com/en-us/library/ms741688.aspx
+ WSARecvRet = WSARecv(
+ Conn->sock,
+ // The socket to recieve from
+ Conn->recvbuf,
+ // Pointer to an 'array' of WSABUFs.
+ 1,
+ // The 'array' is of size 1
+ NULL,
+ // This parameter would recieve the size in bytes of the
+ // read. However, because we are using overlapped, it doesn't.
+ &WSARecvFlags,
+ // Flags which control the operation of WSARecv. I belive MSG_WAITALL
+ // is the one we want.
+ &Conn->recvol,
+ // The overlapped structure for the event.
+ NULL
+ // The callback - we are using events for this, so that's not important
+ );
+ }
+ // unlock Conn
}
}
-void ReadComplete(connection *Conn) {
+void ReadComplete(connection *Conn, DWORD size) {
BOOL GORRet;
- DWORD TransferSize;
+ //DWORD TransferSize;
DWORD Flags;
size_t loadRet, toload, loaded;
char *upto;
- GORRet = WSAGetOverlappedResult(
+/* GORRet = WSAGetOverlappedResult(
Conn->sock,
// The socket to get the read size from
- Conn->ol,
+ &Conn->recvol,
// The overlapped data structure relating to the read event
&TransferSize,
// Gets the size of the transfer
@@ -176,10 +244,10 @@
// is completed (we waited for it) we don't really care.
&Flags
// Gets the flags for the operation
- );
+ ); */
- upto = Conn->netbuf.buf;
- toload = (size_t) TransferSize;
+ upto = Conn->recvbuf.buf;
+ toload = (size_t) size // TransferSize;
while(toload > 0) {
loaded = sa_bufferevent_load(Conn, upto, toload);
Modified: libevent-urz/trunk/loaders/IOCPloader.h
===================================================================
--- libevent-urz/trunk/loaders/IOCPloader.h 2007-07-09 08:44:17 UTC (rev 10770)
+++ libevent-urz/trunk/loaders/IOCPloader.h 2007-07-09 10:55:57 UTC (rev 10771)
@@ -4,10 +4,26 @@
#define SUGGESTED_BUF_SIZE 4096
+#define OP_SEND 1
+#define OP_RECV 2
+
typedef struct {
+ /*
+ This struct must maintain compatiblity with the OVERLAPPED struct.
+ As such, the first element must be an OVERLAPPED struct
+ */
+ OVERLAPPED *ol;
+ char op;
+ DWORD connIndex;
+} OLOPERATION;
+
+typedef struct {
sa_bufferevent *localbuf;
- WSABUF *netbuf;
+ WSABUF *recvbuf;
+ WSABUF *sendbuf;
SOCKET sock;
- OVERLAPPED *ol;
+ OLOPERATION recvol;
+ OLOPERATION sendol;
// mutex lock
+ int canSend;
} connection;
\ No newline at end of file
More information about the tor-commits
mailing list