[tor-commits] [tordnsel/master] introduce conduits
arlo at torproject.org
arlo at torproject.org
Sat Apr 16 06:08:43 UTC 2016
commit fe5b3964db5f8b7f88fc308b38c16e1ccd7fa849
Author: David Kaloper <david at numm.org>
Date: Sat Sep 21 19:18:52 2013 +0200
introduce conduits
And get rid of TorDNSEL.Util.hGetLine.
---
src/TorDNSEL/ExitTest/Request.hs | 94 ++++++++++---------
src/TorDNSEL/ExitTest/Server/Internals.hs | 2 +-
src/TorDNSEL/TorControl/Internals.hs | 78 ++++++++--------
src/TorDNSEL/Util.hsc | 148 ++++++++----------------------
tordnsel.cabal | 2 +-
5 files changed, 130 insertions(+), 194 deletions(-)
diff --git a/src/TorDNSEL/ExitTest/Request.hs b/src/TorDNSEL/ExitTest/Request.hs
index 87a2fbd..82e198c 100644
--- a/src/TorDNSEL/ExitTest/Request.hs
+++ b/src/TorDNSEL/ExitTest/Request.hs
@@ -1,3 +1,5 @@
+{-# LANGUAGE OverloadedStrings #-}
+
-----------------------------------------------------------------------------
-- |
-- Module : TorDNSEL.ExitTest.Request
@@ -24,14 +26,20 @@ module TorDNSEL.ExitTest.Request (
, cookieLen
) where
-import Control.Arrow ((***))
-import Control.Monad (guard)
+import Control.Arrow ((***), second)
+import Control.Applicative
+import Control.Monad
import Control.Monad.Trans (lift, liftIO)
+import Data.Monoid
import qualified Data.ByteString.Char8 as B
+import qualified Data.ByteString.Lazy as BL
import Data.Char (isSpace, toLower)
import qualified Data.Map as M
import System.IO (Handle)
+import Data.Conduit
+import qualified Data.Conduit.Binary as CB
+
import TorDNSEL.Util
--------------------------------------------------------------------------------
@@ -40,55 +48,50 @@ import TorDNSEL.Util
-- | Create an HTTP request that POSTs a cookie to one of our listening ports.
createRequest :: B.ByteString -> Port -> Cookie -> B.ByteString
createRequest host port cookie =
- B.intercalate (B.pack "\r\n")
- -- POST should force caching proxies to forward the request.
- [ B.pack "POST / HTTP/1.0"
- -- Host doesn't exist in HTTP 1.0. We'll use it anyway to help the request
- -- traverse transparent proxies.
- , B.pack "Host: " `B.append` hostValue
- , B.pack "Content-Type: application/octet-stream"
- , B.pack "Content-Length: " `B.append` B.pack (show cookieLen)
- , B.pack "Connection: close"
- , B.pack "\r\n" `B.append` unCookie cookie ]
+ B.intercalate "\r\n"
+ -- POST should force caching proxies to forward the request.
+ [ "POST / HTTP/1.0"
+ -- Host doesn't exist in HTTP 1.0. We'll use it anyway to help the request
+ -- traverse transparent proxies.
+ , "Host: " <> hostValue
+ , "Content-Type: application/octet-stream"
+ , "Content-Length: " <> bshow cookieLen
+ , "Connection: close"
+ , "\r\n" <> unCookie cookie ]
+
where
hostValue
| port == 80 = host
- | otherwise = B.concat [host, B.pack ":", B.pack $ show port]
+ | otherwise = B.concat [host, ":", bshow port]
-- | Given an HTTP client, return the cookie contained in the body of the HTTP
-- request if it's well-formatted, otherwise return 'Nothing'.
-getRequest :: Handle -> MaybeT IO Cookie
-getRequest client = do
- (reqLine,headers) <- liftIO $ getHeader
- guard $ reqLine `elem` [B.pack "POST / HTTP/1.0", B.pack "POST / HTTP/1.1"]
- Just contentType <- return $ B.pack "content-type" `M.lookup` headers
- guard $ contentType == B.pack "application/octet-stream"
- Just contentLen <- return $ readInt =<< B.pack "content-length" `M.lookup` headers
- guard $ contentLen == cookieLen
-
- fmap Cookie . lift $ B.hGet client cookieLen
+getRequest :: Handle -> IO (Maybe Cookie)
+getRequest client =
+ CB.sourceHandle client $= CB.isolate maxReqLen $$ do
+ mh <- getHeaders
+ case checkHeaders mh of
+ Nothing -> return Nothing
+ Just _ -> Just . Cookie <$> takeC cookieLen
+
where
- maxHeaderLen = 2048
- crlf = B.pack "\r\n"
- crlfLen = 2
-
- getHeader = do
- reqLine <- hGetLine client crlf maxHeaderLen
- headers <- getHeaders (maxHeaderLen - B.length reqLine - crlfLen)
- return (reqLine, M.fromList headers)
-
- getHeaders remain
- | remain <= 0 = return []
- | otherwise = do
- header <- hGetLine client crlf remain
- if B.null header
- then return []
- else do
- headers <- getHeaders (remain - B.length header - crlfLen)
- return (readHeader header : headers)
-
- readHeader =
- (B.map toLower *** B.dropWhile isSpace . B.drop 1) . B.break (== ':')
+ maxReqLen = 2048 + cookieLen
+ line = frameC "\r\n"
+
+ getHeaders =
+ (,) <$> line
+ <*> (decodeHeaders <$> muntil B.null line)
+ where
+ decodeHeaders = M.fromList .
+ map ((B.map toLower *** B.dropWhile isSpace . B.tail)
+ . B.break (== ':'))
+
+ checkHeaders (reqLine, headers) = do
+ contentType <- "content-type" `M.lookup` headers
+ contentLen <- readInt =<< "content-length" `M.lookup` headers
+ guard $ reqLine `elem` ["POST / HTTP/1.0", "POST / HTTP/1.1"]
+ guard $ contentType == "application/octet-stream"
+ guard $ contentLen == cookieLen
--------------------------------------------------------------------------------
-- Cookies
@@ -97,7 +100,7 @@ getRequest client = do
-- associate it with the exit node we're testing through and use it look up that
-- exit node when we receive it on a listening port.
newtype Cookie = Cookie { unCookie :: B.ByteString }
- deriving (Eq, Ord)
+ deriving (Eq, Ord, Show)
-- | Create a new cookie from pseudo-random data.
newCookie :: (Int -> IO B.ByteString) -> IO Cookie
@@ -106,3 +109,4 @@ newCookie getRandBytes = Cookie `fmap` getRandBytes cookieLen
-- | The cookie length in bytes.
cookieLen :: Int
cookieLen = 32
+
diff --git a/src/TorDNSEL/ExitTest/Server/Internals.hs b/src/TorDNSEL/ExitTest/Server/Internals.hs
index 8f9a872..13e2136 100644
--- a/src/TorDNSEL/ExitTest/Server/Internals.hs
+++ b/src/TorDNSEL/ExitTest/Server/Internals.hs
@@ -181,7 +181,7 @@ handleMessage conf s (NewClient sock addr) = do
tid <- forkLinkIO . (`E.finally` signalQSemN (handlerSem s) 1) .
E.bracket (socketToHandle sock ReadWriteMode) hClose $ \client -> do
r <- timeout readTimeout . E.try $ do
- r <- runMaybeT $ getRequest client
+ r <- getRequest client
case r of
Just cookie -> do
now <- getCurrentTime
diff --git a/src/TorDNSEL/TorControl/Internals.hs b/src/TorDNSEL/TorControl/Internals.hs
index 254d6b1..7e0b8f1 100644
--- a/src/TorDNSEL/TorControl/Internals.hs
+++ b/src/TorDNSEL/TorControl/Internals.hs
@@ -95,7 +95,6 @@ module TorDNSEL.TorControl.Internals (
-- * Backend connection manager
, IOMessage(..)
, startIOManager
- , ReplyType(..)
, startSocketReader
-- * Data types
@@ -130,6 +129,7 @@ module TorDNSEL.TorControl.Internals (
, parseReplyCode
, throwIfNotPositive
, isPositive
+
) where
import Control.Arrow (first, second)
@@ -156,6 +156,10 @@ import Data.Typeable (Typeable)
import System.IO (Handle, hClose, hSetBuffering, BufferMode(..), hFlush)
import System.IO.Error (isEOFError)
+import Data.Conduit
+import qualified Data.Conduit.Binary as CB
+import qualified Data.Conduit.List as CL
+
import TorDNSEL.Control.Concurrent.Link
import TorDNSEL.Control.Concurrent.Future
import TorDNSEL.Control.Concurrent.Util
@@ -213,7 +217,6 @@ openConnection handle mbPasswd = do
let conn' = Conn tellIOManager ioManagerTid protInfo confSettings
authenticate mbPasswd conn'
useFeature [VerboseNames] conn'
- putStrLn "*X MRMLJ"
return conn'
) `onException'` closeConnection' conn confSettings
@@ -823,45 +826,48 @@ startIOManager handle = do
eventCode = B.takeWhile (/= ' ') . repText
--- | Reply types in a single sequence of replies.
-data ReplyType
- = MidReply !Reply -- ^ A reply preceding other replies.
- | LastReply !Reply -- ^ The last reply.
- deriving Show
-
-- | Start a thread that reads replies from @handle@ and passes them to
-- @sendRepliesToIOManager@, linking it to the calling thread.
startSocketReader :: Handle -> ([Reply] -> IO ()) -> IO ThreadId
startSocketReader handle sendRepliesToIOManager =
- forkLinkIO . forever $ readReplies >>= sendRepliesToIOManager
+ forkLinkIO $ CB.sourceHandle handle $=
+ repliesC $$
+ CL.mapM_ sendRepliesToIOManager
+
+-- | Conduit taking lines to 'Reply' blocks.
+replyC :: Conduit B.ByteString IO [Reply]
+replyC =
+ line0 []
+ where
+
+ line0 acc = await >>= return () `maybe` \line -> do
+ let (code, (typ, text)) = B.splitAt 1 `second` B.splitAt 3 line
+ code' <- either (monadThrow . ProtocolError) return $
+ parseReplyCode code
+ case () of
+ _ | typ == B.pack "-" -> line0 (Reply code' text [] : acc)
+ | typ == B.pack "+" -> line0 . (: acc) . Reply code' text =<< rest []
+ | typ == B.pack " " -> do
+ yield $ reverse (Reply code' text [] : acc)
+ line0 []
+ | otherwise -> monadThrow $ ProtocolError $
+ cat "Malformed reply line type " (esc 1 typ) '.'
+
+ rest acc =
+ await >>= \mline -> case mline of
+ Nothing -> return $ reverse acc
+ Just line | B.null line -> rest acc
+ | line == B.pack "." -> return $ reverse (line:acc)
+ | otherwise -> rest (line:acc)
+
+-- | Conduit taking raw 'ByteString' to 'Reply' blocks.
+repliesC :: Conduit B.ByteString IO [Reply]
+repliesC =
+ CB.lines =$= CL.map strip =$= replyC
where
- readReplies = do
- line <- parseReplyLine =<< hGetLine handle crlf maxLineLength
- case line of
- MidReply reply -> fmap (reply :) readReplies
- LastReply reply -> return [reply]
-
- parseReplyLine line =
- either (E.throwIO . ProtocolError) (parseReplyLine' typ text)
- (parseReplyCode code)
- where (code,(typ,text)) = B.splitAt 1 `second` B.splitAt 3 line
-
- parseReplyLine' typ text code
- | typ == B.pack "-" = return . MidReply $ Reply code text []
- | typ == B.pack "+" = (MidReply . Reply code text) `fmap` readData
- | typ == B.pack " " = return . LastReply $ Reply code text []
- | otherwise = E.throwIO . ProtocolError $
- cat "Malformed reply line type " (esc 1 typ) '.'
-
- readData = do
- line <- hGetLine handle (B.pack "\n") maxLineLength
- case (if B.last line == '\r' then B.init else id) line of
- line' | line == (B.pack ".\r") -> return []
- | any B.null [line, line'] -> readData
- | otherwise -> fmap (line' :) readData
-
- crlf = B.pack "\r\n"
- maxLineLength = 2^20
+ strip bs = case unsnoc bs of
+ Just (bs', '\r') -> bs'
+ _ -> bs
--------------------------------------------------------------------------------
-- Data types
diff --git a/src/TorDNSEL/Util.hsc b/src/TorDNSEL/Util.hsc
index 5cea0bb..6bbffc3 100644
--- a/src/TorDNSEL/Util.hsc
+++ b/src/TorDNSEL/Util.hsc
@@ -30,6 +30,9 @@ module TorDNSEL.Util (
, replaceError
, handleError
+ -- * Show functions
+ , bshow
+
-- * Strict functions
, adjust'
, alter'
@@ -49,6 +52,7 @@ module TorDNSEL.Util (
, inet_htoa
, encodeBase16
, split
+ , unsnoc
, syncExceptions
, bracket'
, finally'
@@ -59,10 +63,13 @@ module TorDNSEL.Util (
, inBoundsOf
, htonl
, ntohl
- , hGetLine
, splitByDelimiter
, showUTCTime
+ -- * Conduit utilities
+ , takeC
+ , frameC
+
-- * Network functions
, bindUDPSocket
, bindListeningTCPSocket
@@ -116,9 +123,11 @@ import Data.Char
import Data.Dynamic (Dynamic)
import Data.List (foldl', intersperse)
import Data.Maybe (mapMaybe)
+import Data.Monoid
import qualified Data.ByteString.Char8 as B
-import qualified Data.ByteString.Internal as B
-import qualified Data.ByteString.Unsafe as B
+import qualified Data.ByteString.Lazy as BL
+import qualified Data.ByteString.Internal as B (c2w)
+import qualified Data.ByteString as B (hGetSome)
import Data.ByteString (ByteString)
import qualified Data.Map as M
import Data.Ratio (numerator, denominator, (%))
@@ -140,6 +149,9 @@ import System.Posix.Types (FileMode)
import Text.Printf (printf)
import Data.Binary (Binary(..))
+import qualified Data.Conduit as C
+import qualified Data.Conduit.Binary as CB
+
#include <netinet/in.h>
--------------------------------------------------------------------------------
@@ -240,6 +252,12 @@ handleError :: MonadError e m => (e -> m a) -> m a -> m a
handleError = flip catchError
--------------------------------------------------------------------------------
+-- Show functions
+
+bshow :: (Show a) => a -> B.ByteString
+bshow = B.pack . show
+
+--------------------------------------------------------------------------------
-- Strict functions
-- | Same as 'M.adjust', but the adjusting function is applied strictly.
@@ -322,6 +340,10 @@ encodeBase16 = B.pack . concat . B.foldr ((:) . toBase16 . B.c2w) []
split :: Int -> ByteString -> [ByteString]
split x = takeWhile (not . B.null) . map (B.take x) . iterate (B.drop x)
+-- | Deconstruct a 'ByteString' at the tail.
+unsnoc :: ByteString -> Maybe (ByteString, Char)
+unsnoc bs | B.null bs = Nothing
+ | otherwise = Just (B.init bs, B.last bs)
-- | Try an action, catching -- roughly -- "synchronous" exceptions.
--
@@ -401,114 +423,18 @@ instance Error e => MonadError e Maybe where
foreign import ccall unsafe "htonl" htonl :: Word32 -> Word32
foreign import ccall unsafe "ntohl" ntohl :: Word32 -> Word32
--- | Read a line terminated by an arbitrary sequence of bytes from a handle. The
--- end-of-line sequence is stripped before returning the line. @maxLen@
--- specifies the maximum line length to read, not including the end-of-line
--- sequence. If the line length exceeds @maxLen@, return the first @maxLen@
--- bytes. If EOF is encountered, return the bytes preceding it. The handle
--- should be in 'LineBuffering' mode.
-hGetLine :: Handle -> ByteString -> Int -> IO ByteString
-hGetLine = error "hGetLine" -- XXX STUB
--- hGetLine h eol maxLen | B.null eol = B.hGet h maxLen
--- hGetLine h eol@(B.PS _ _ eolLen) maxLen
--- = wantReadableHandle "TorDNSEL.Util.hGetLine" h $ \handle_ -> do
--- case haBufferMode handle_ of
--- NoBuffering -> error "no buffering"
--- _other -> hGetLineBuffered handle_
---
--- where
--- hGetLineBuffered handle_ = do
--- let ref = haBuffer handle_
--- buf <- readIORef ref
--- hGetLineBufferedLoop handle_ ref buf 0 0 []
---
--- hGetLineBufferedLoop handle_ ref
--- buf at Buffer{ bufRPtr=r, bufWPtr=w, bufBuf=raw } !len !eolIx xss = do
--- (new_eolIx,off) <- findEOL eolIx r w raw
--- let new_len = len + off - r
---
--- if maxLen > 0 && new_len - new_eolIx > maxLen
--- -- If the line length exceeds maxLen, return a partial line.
--- then do
--- let maxOff = off - (new_len - maxLen)
--- writeIORef ref buf{ bufRPtr = maxOff }
--- mkBigPS . (:xss) =<< mkPS raw r maxOff
--- else if new_eolIx == eolLen
--- -- We have a complete line; strip the EOL sequence and return it.
--- then do
--- if w == off
--- then writeIORef ref buf{ bufRPtr=0, bufWPtr=0 }
--- else writeIORef ref buf{ bufRPtr = off }
--- if eolLen <= off - r
--- then mkBigPS . (:xss) =<< mkPS raw r (off - eolLen)
--- else fmap stripEOL . mkBigPS . (:xss) =<< mkPS raw r off
--- else do
--- xs <- mkPS raw r off
--- maybe_buf <- maybeFillReadBuffer (haFD handle_) True
--- (haIsStream handle_) buf{ bufWPtr=0, bufRPtr=0 }
--- case maybe_buf of
--- -- Nothing indicates we caught an EOF, and we may have a
--- -- partial line to return.
--- Nothing -> do
--- writeIORef ref buf{ bufRPtr=0, bufWPtr=0 }
--- if new_len > 0
--- then mkBigPS (xs:xss)
--- else ioe_EOF
--- Just new_buf ->
--- hGetLineBufferedLoop handle_ ref new_buf new_len new_eolIx
--- (xs:xss)
---
--- maybeFillReadBuffer fd is_line is_stream buf
--- = catch (Just `fmap` fillReadBuffer fd is_line is_stream buf)
--- (\e -> if isEOFError e then return Nothing else ioError e)
---
--- findEOL eolIx
--- | eolLen == 1 = findEOLChar (B.w2c $ B.unsafeHead eol)
--- | otherwise = findEOLSeq eolIx
---
--- findEOLChar eolChar r w raw
--- | r == w = return (0, r)
--- | otherwise = do
--- (!c,!r') <- readCharFromBuffer raw r
--- if c == eolChar
--- then return (1, r')
--- else findEOLChar eolChar r' w raw
---
--- -- find the end-of-line sequence, if there is one
--- findEOLSeq !eolIx r w raw
--- | eolIx == eolLen || r == w = return (eolIx, r)
--- | otherwise = do
--- (!c,!r') <- readCharFromBuffer raw r
--- findEOLSeq (next c eolIx + 1) r' w raw
---
--- -- get the next index into the EOL sequence we should match against
--- next !c !i = if i >= 0 && c /= eolIndex i then next c (table ! i) else i
---
--- eolIndex = B.w2c . B.unsafeIndex eol
---
--- -- build a match table for the Knuth-Morris-Pratt algorithm
--- table = runSTUArray (do
--- arr <- newArray_ (0, if eolLen == 1 then 1 else eolLen - 1)
--- zipWithM_ (writeArray arr) [0,1] [-1,0]
--- loop arr 2 0)
--- where
--- loop arr !t !p
--- | t >= eolLen = return arr
--- | eolIndex (t - 1) == eolIndex p
--- = let p' = p + 1 in writeArray arr t p' >> loop arr (t + 1) p'
--- | p > 0 = readArray arr p >>= loop arr t
--- | otherwise = writeArray arr t 0 >> loop arr (t + 1) p
---
--- stripEOL (B.PS p s l) = E.assert (new_len >= 0) . B.copy $ B.PS p s new_len
--- where new_len = l - eolLen
---
--- mkPS buf start end = B.create len $ \p -> do
--- B.memcpy_ptr_baoff p buf (fromIntegral start) (fromIntegral len)
--- return ()
--- where len = end - start
---
--- mkBigPS [ps] = return ps
--- mkBigPS pss = return $! B.concat (reverse pss)
+takeC :: Monad m => Int -> C.ConduitM ByteString o m ByteString
+takeC = fmap (mconcat . BL.toChunks) . CB.take
+
+-- | Take a prefix up to delimiter.
+-- FIXME This is worst-case quadratic.
+frameC :: Monad m => ByteString -> C.ConduitM ByteString o m ByteString
+frameC delim = loop $ B.pack "" where
+ loop acc = C.await >>=
+ return acc `maybe` \bs ->
+ case B.breakSubstring delim $ acc <> bs of
+ (h, t) | B.null t -> loop h
+ | otherwise -> h <$ C.leftover (B.drop (B.length delim) t)
-- | Split @bs@ into pieces delimited by @delimiter@, consuming the delimiter.
-- The result for overlapping delimiters is undefined.
diff --git a/tordnsel.cabal b/tordnsel.cabal
index 3173943..50e7f40 100644
--- a/tordnsel.cabal
+++ b/tordnsel.cabal
@@ -15,7 +15,7 @@ Maintainer: tup.tuple at googlemail.com, lunar at debian.org, andrew at torproject.o
Build-Type: Simple
Build-Depends: base>=2.0, network>=2.0, mtl>=1.0, unix>=1.0, stm>=2.0,
time>=1.0, HUnit>=1.1, binary>=0.4, bytestring>=0.9, array>=0.1, directory>=1.0,
- containers>=0.1, deepseq >= 1.3
+ containers>=0.1, conduit >= 1.0.0 && < 1.1.0, deepseq >= 1.3
Tested-With: GHC==6.6, GHC==6.8, GHC==6.10, GHC==6.12
Data-Files: config/tordnsel.conf.sample, contrib/cacti-input.pl,
contrib/tordnsel-init.d-script.sample, doc/tordnsel.8
More information about the tor-commits
mailing list