[tor-commits] [snowflake/master] client interfaces compose better, remove some globals, test ConnectLoop
serene at torproject.org
serene at torproject.org
Fri May 20 02:45:26 UTC 2016
commit 6b8568cc6cf7186d86e844a6f1460c9af802575a
Author: Serene Han <keroserene+git at gmail.com>
Date: Thu May 19 18:06:34 2016 -0700
client interfaces compose better, remove some globals, test ConnectLoop
---
client/client_test.go | 85 +++++++++++++++++++++++++-------
client/snowflake.go | 133 +++++++++++++++++++++++++++++++++++---------------
client/webrtc.go | 43 +++++++++-------
3 files changed, 186 insertions(+), 75 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go
index 41dc5e6..93e0422 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -2,6 +2,7 @@ package main
import (
"bytes"
+ "fmt"
"github.com/keroserene/go-webrtc"
. "github.com/smartystreets/goconvey/convey"
"io/ioutil"
@@ -48,9 +49,64 @@ func (m *MockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
return r, nil
}
-func TestConnect(t *testing.T) {
+type FakeDialer struct{}
+
+func (w FakeDialer) Catch() (*webRTCConn, error) {
+ fmt.Println("Caught a dummy snowflake.")
+ return &webRTCConn{}, nil
+}
+
+func TestSnowflakeClient(t *testing.T) {
Convey("Snowflake", t, func() {
- webrtcRemotes = make(map[int]*webRTCConn)
+
+ Convey("Peers", func() {
+
+ Convey("WebRTC ConnectLoop continues until capacity of 1.\n", func() {
+ peers := NewPeers(1)
+ peers.Tongue = FakeDialer{}
+
+ go ConnectLoop(peers)
+ <-peers.maxedChan
+
+ So(peers.Count(), ShouldEqual, 1)
+ r := <-peers.snowflakeChan
+ So(r, ShouldNotBeNil)
+ So(peers.Count(), ShouldEqual, 0)
+ })
+
+ Convey("WebRTC ConnectLoop continues until capacity of 3.\n", func() {
+ peers := NewPeers(3)
+ peers.Tongue = FakeDialer{}
+
+ go ConnectLoop(peers)
+ <-peers.maxedChan
+ So(peers.Count(), ShouldEqual, 3)
+ <-peers.snowflakeChan
+ <-peers.snowflakeChan
+ <-peers.snowflakeChan
+ So(peers.Count(), ShouldEqual, 0)
+ })
+
+ Convey("WebRTC ConnectLoop continues filling when Snowflakes disconnect.\n", func() {
+ peers := NewPeers(3)
+ peers.Tongue = FakeDialer{}
+
+ go ConnectLoop(peers)
+ <-peers.maxedChan
+ So(peers.Count(), ShouldEqual, 3)
+
+ r := <-peers.snowflakeChan
+ So(peers.Count(), ShouldEqual, 2)
+ r.Close()
+ <-peers.maxedChan
+ So(peers.Count(), ShouldEqual, 3)
+
+ <-peers.snowflakeChan
+ <-peers.snowflakeChan
+ <-peers.snowflakeChan
+ So(peers.Count(), ShouldEqual, 0)
+ })
+ })
Convey("WebRTC Connection", func() {
c := new(webRTCConn)
@@ -60,17 +116,13 @@ func TestConnect(t *testing.T) {
}
So(c.buffer.Bytes(), ShouldEqual, nil)
- Convey("Create and remove from WebRTCConn set", func() {
- So(len(webrtcRemotes), ShouldEqual, 0)
- So(remoteIndex, ShouldEqual, 0)
+ Convey("Can construct a WebRTCConn", func() {
s := NewWebRTCConnection(nil, nil)
So(s, ShouldNotBeNil)
So(s.index, ShouldEqual, 0)
- So(len(webrtcRemotes), ShouldEqual, 1)
- So(remoteIndex, ShouldEqual, 1)
+ So(s.offerChannel, ShouldNotBeNil)
+ So(s.answerChannel, ShouldNotBeNil)
s.Close()
- So(len(webrtcRemotes), ShouldEqual, 0)
- So(remoteIndex, ShouldEqual, 1)
})
Convey("Write buffers when datachannel is nil", func() {
@@ -113,9 +165,6 @@ func TestConnect(t *testing.T) {
<-c.reset
})
- Convey("Connect Loop", func() {
- // TODO
- })
})
})
@@ -124,14 +173,14 @@ func TestConnect(t *testing.T) {
transport := &MockTransport{http.StatusOK}
fakeOffer := webrtc.DeserializeSessionDescription("test")
- Convey("BrokerChannel with no front domain", func() {
+ Convey("Construct BrokerChannel with no front domain", func() {
b := NewBrokerChannel("test.broker", "", transport)
So(b.url, ShouldNotBeNil)
So(b.url.Path, ShouldResemble, "test.broker")
So(b.transport, ShouldNotBeNil)
})
- Convey("BrokerChannel with front domain", func() {
+ Convey("Construct BrokerChannel *with* front domain", func() {
b := NewBrokerChannel("test.broker", "front", transport)
So(b.url, ShouldNotBeNil)
So(b.url.Path, ShouldResemble, "test.broker")
@@ -139,7 +188,7 @@ func TestConnect(t *testing.T) {
So(b.transport, ShouldNotBeNil)
})
- Convey("BrokerChannel Negotiate responds with answer", func() {
+ Convey("BrokerChannel.Negotiate responds with answer", func() {
b := NewBrokerChannel("test.broker", "", transport)
answer, err := b.Negotiate(fakeOffer)
So(err, ShouldBeNil)
@@ -147,7 +196,7 @@ func TestConnect(t *testing.T) {
So(answer.Sdp, ShouldResemble, "fake")
})
- Convey("BrokerChannel Negotiate fails with 503", func() {
+ Convey("BrokerChannel.Negotiate fails with 503", func() {
b := NewBrokerChannel("test.broker", "",
&MockTransport{http.StatusServiceUnavailable})
answer, err := b.Negotiate(fakeOffer)
@@ -156,7 +205,7 @@ func TestConnect(t *testing.T) {
So(err.Error(), ShouldResemble, BrokerError503)
})
- Convey("BrokerChannel Negotiate fails with 400", func() {
+ Convey("BrokerChannel.Negotiate fails with 400", func() {
b := NewBrokerChannel("test.broker", "",
&MockTransport{http.StatusBadRequest})
answer, err := b.Negotiate(fakeOffer)
@@ -165,7 +214,7 @@ func TestConnect(t *testing.T) {
So(err.Error(), ShouldResemble, BrokerError400)
})
- Convey("BrokerChannel Negotiate fails with unexpected", func() {
+ Convey("BrokerChannel.Negotiate fails with unexpected error", func() {
b := NewBrokerChannel("test.broker", "",
&MockTransport{123})
answer, err := b.Negotiate(fakeOffer)
diff --git a/client/snowflake.go b/client/snowflake.go
index f32ddc8..61864ca 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -5,6 +5,7 @@ import (
"bufio"
"errors"
"flag"
+ "fmt"
"io"
"log"
"net"
@@ -22,14 +23,12 @@ var ptInfo pt.ClientInfo
const (
ReconnectTimeout = 10
- SnowflakeCapacity = 3
+ SnowflakeCapacity = 1
)
var brokerURL string
var frontDomain string
var iceServers IceServerList
-var snowflakeChan = make(chan *webRTCConn, 1)
-var broker *BrokerChannel
// When a connection handler starts, +1 is written to this channel; when it
// ends, -1 is written.
@@ -50,62 +49,110 @@ func copyLoop(a, b net.Conn) {
log.Println("copy loop ended")
}
-// Interface that matches both webrtc.DataChannel and for testing.
+// Interface for catching Snowflakes.
+type Tongue interface {
+ Catch() (*webRTCConn, error)
+}
+
+// Interface for the Snowflake transport. (usually a webrtc.DataChannel)
type SnowflakeChannel interface {
Send([]byte)
Close() error
}
+// Collect and track available remote WebRTC Peers, to switch between if the
+// current one disconnects.
+// Right now, it is only possible to use one remote in a circuit. This can be
+// updated once multiplexed transport on a single circuit is available.
+type Peers struct {
+ Tongue
+
+ snowflakeChan chan *webRTCConn
+ current *webRTCConn
+ capacity int
+ maxedChan chan struct{}
+}
+
+func NewPeers(max int) *Peers {
+ p := &Peers{capacity: max}
+ p.snowflakeChan = make(chan *webRTCConn, max)
+ p.maxedChan = make(chan struct{}, 1)
+ return p
+}
+
+// Find, connect, and add a new peer to the internal collection.
+func (p *Peers) FindSnowflake() (*webRTCConn, error) {
+ if p.Count() >= p.capacity {
+ s := fmt.Sprintf("At capacity [%d/%d]", p.Count(), p.capacity)
+ p.maxedChan <- struct{}{}
+ return nil, errors.New(s)
+ }
+ connection, err := p.Catch()
+ if err != nil {
+ return nil, err
+ }
+ return connection, nil
+}
+
+// TODO: Needs fixing.
+func (p *Peers) Count() int {
+ return len(p.snowflakeChan)
+}
+
+// Close all remote peers.
+func (p *Peers) End() {
+ log.Printf("WebRTC: interruped")
+ if nil != p.current {
+ p.current.Close()
+ }
+ for r := range p.snowflakeChan {
+ r.Close()
+ }
+}
+
// Maintain |SnowflakeCapacity| number of available WebRTC connections, to
// transfer to the Tor SOCKS handler when needed.
-func SnowflakeConnectLoop() {
- transport := CreateBrokerTransport()
- broker = NewBrokerChannel(brokerURL, frontDomain, transport)
+func ConnectLoop(peers *Peers) {
for {
- numRemotes := len(webrtcRemotes)
- if numRemotes >= SnowflakeCapacity {
- log.Println("At Capacity: ", numRemotes, "snowflake. Re-checking in 10s")
- <-time.After(time.Second * 10)
- continue
- }
- s, err := dialWebRTC()
+ s, err := peers.FindSnowflake()
if nil == s || nil != err {
- log.Println("WebRTC Error: ", err, " retrying...")
+ log.Println("WebRTC Error:", err,
+ " Retrying in", ReconnectTimeout, "seconds...")
<-time.After(time.Second * ReconnectTimeout)
continue
}
- snowflakeChan <- s
+ peers.snowflakeChan <- s
+ <-time.After(time.Second)
}
}
-// Initialize a WebRTC Connection.
-func dialWebRTC() (*webRTCConn, error) {
+// Implements |Tongue|
+type WebRTCDialer struct {
+ *BrokerChannel
+}
+
+// Initialize a WebRTC Connection by signaling through the broker.
+func (w WebRTCDialer) Catch() (*webRTCConn, error) {
+ if nil == w.BrokerChannel {
+ return nil, errors.New("Cannot Dial WebRTC without a BrokerChannel.")
+ }
// TODO: [#3] Fetch ICE server information from Broker.
// TODO: [#18] Consider TURN servers here too.
config := webrtc.NewConfiguration(iceServers...)
- if nil == broker {
- return nil, errors.New("Failed to prepare BrokerChannel")
- }
- connection := NewWebRTCConnection(config, broker)
+ connection := NewWebRTCConnection(config, w.BrokerChannel)
err := connection.Connect()
return connection, err
}
-func endWebRTC() {
- log.Printf("WebRTC: interruped")
- for _, r := range webrtcRemotes {
- r.Close()
- }
-}
-
// Establish a WebRTC channel for SOCKS connections.
-func handler(conn *pt.SocksConn) error {
+func handler(conn *pt.SocksConn, peers *Peers) error {
handlerChan <- 1
defer func() {
handlerChan <- -1
}()
// Wait for an available WebRTC remote...
- remote, ok := <-snowflakeChan
+ remote, ok := <-peers.snowflakeChan
+ peers.current = remote
if remote == nil || !ok {
conn.Reject()
return errors.New("handler: Received invalid Snowflake")
@@ -125,7 +172,7 @@ func handler(conn *pt.SocksConn) error {
return nil
}
-func acceptLoop(ln *pt.SocksListener) error {
+func acceptLoop(ln *pt.SocksListener, peers *Peers) error {
defer ln.Close()
for {
log.Println("SOCKS listening...", ln)
@@ -138,7 +185,7 @@ func acceptLoop(ln *pt.SocksListener) error {
return err
}
go func() {
- err := handler(conn)
+ err := handler(conn, peers)
if err != nil {
log.Printf("handler error: %s", err)
}
@@ -146,6 +193,7 @@ func acceptLoop(ln *pt.SocksListener) error {
}
}
+// TODO: Fix since multiplexing changes access to remotes.
func readSignalingMessages(f *os.File) {
log.Printf("readSignalingMessages")
s := bufio.NewScanner(f)
@@ -157,10 +205,10 @@ func readSignalingMessages(f *os.File) {
log.Printf("ignoring invalid signal message %+q", msg)
continue
}
- webrtcRemotes[0].answerChannel <- sdp
+ // webrtcRemotes[0].answerChannel <- sdp
}
log.Printf("close answerChannel")
- close(webrtcRemotes[0].answerChannel)
+ // close(webrtcRemotes[0].answerChannel)
if err := s.Err(); err != nil {
log.Printf("signal FIFO: %s", err)
}
@@ -204,8 +252,13 @@ func main() {
go readSignalingMessages(signalFile)
}
- webrtcRemotes = make(map[int]*webRTCConn)
- go SnowflakeConnectLoop()
+ // Prepare WebRTC Peers and the Broker, then accumulate connections.
+ // TODO: Expose remote peer capacity as a flag?
+ remotes := NewPeers(SnowflakeCapacity)
+ broker := NewBrokerChannel(brokerURL, frontDomain, CreateBrokerTransport())
+
+ remotes.Tongue = WebRTCDialer{broker}
+ go ConnectLoop(remotes)
ptInfo, err = pt.ClientSetup(nil)
if err != nil {
@@ -221,12 +274,13 @@ func main() {
for _, methodName := range ptInfo.MethodNames {
switch methodName {
case "snowflake":
+ // TODO: Be able to recover when SOCKS dies.
ln, err := pt.ListenSocks("tcp", "127.0.0.1:0")
if err != nil {
pt.CmethodError(methodName, err.Error())
break
}
- go acceptLoop(ln)
+ go acceptLoop(ln, remotes)
pt.Cmethod(methodName, ln.Version(), ln.Addr())
listeners = append(listeners, ln)
default:
@@ -234,7 +288,6 @@ func main() {
}
}
pt.CmethodsDone()
- defer endWebRTC()
var numHandlers int = 0
var sig os.Signal
@@ -254,6 +307,8 @@ func main() {
ln.Close()
}
+ remotes.End()
+
// wait for second signal or no more handlers
sig = nil
for sig == nil && numHandlers != 0 {
diff --git a/client/webrtc.go b/client/webrtc.go
index 5b30e95..e01cbf7 100644
--- a/client/webrtc.go
+++ b/client/webrtc.go
@@ -11,26 +11,27 @@ import (
"time"
)
-// Implements net.Conn interface
+// Remote WebRTC peer. Implements the |net.Conn| interface.
type webRTCConn struct {
- config *webrtc.Configuration
- pc *webrtc.PeerConnection
- snowflake SnowflakeChannel // Interface holding the WebRTC DataChannel.
- broker *BrokerChannel
+ config *webrtc.Configuration
+ pc *webrtc.PeerConnection
+ snowflake SnowflakeChannel // Holds the WebRTC DataChannel.
+ broker *BrokerChannel
+
offerChannel chan *webrtc.SessionDescription
answerChannel chan *webrtc.SessionDescription
errorChannel chan error
+ endChannel chan struct{}
recvPipe *io.PipeReader
writePipe *io.PipeWriter
buffer bytes.Buffer
reset chan struct{}
- index int
+
+ index int
+ closed bool
*BytesInfo
}
-var webrtcRemotes map[int]*webRTCConn
-var remoteIndex int = 0
-
func (c *webRTCConn) Read(b []byte) (int, error) {
return c.recvPipe.Read(b)
}
@@ -51,10 +52,17 @@ func (c *webRTCConn) Close() error {
var err error = nil
log.Printf("WebRTC: Closing")
c.cleanup()
- close(c.offerChannel)
- close(c.answerChannel)
- close(c.errorChannel)
- delete(webrtcRemotes, c.index)
+ if nil != c.offerChannel {
+ close(c.offerChannel)
+ }
+ if nil != c.answerChannel {
+ close(c.answerChannel)
+ }
+ if nil != c.errorChannel {
+ close(c.errorChannel)
+ }
+ // Mark for deletion.
+ c.closed = true
return err
}
@@ -78,6 +86,7 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
return fmt.Errorf("SetWriteDeadline not implemented")
}
+// Construct a WebRTC PeerConnection.
func NewWebRTCConnection(config *webrtc.Configuration,
broker *BrokerChannel) *webRTCConn {
connection := new(webRTCConn)
@@ -90,6 +99,7 @@ func NewWebRTCConnection(config *webrtc.Configuration,
connection.errorChannel = make(chan error, 1)
connection.reset = make(chan struct{}, 1)
+ // TODO: Separate out.
// Log every few seconds.
connection.BytesInfo = &BytesInfo{
inboundChan: make(chan int, 5), outboundChan: make(chan int, 5),
@@ -99,9 +109,6 @@ func NewWebRTCConnection(config *webrtc.Configuration,
// Pipes remain the same even when DataChannel gets switched.
connection.recvPipe, connection.writePipe = io.Pipe()
- connection.index = remoteIndex
- webrtcRemotes[connection.index] = connection
- remoteIndex++
return connection
}
@@ -296,12 +303,12 @@ func (c *webRTCConn) Reset() {
func (c *webRTCConn) cleanup() {
if nil != c.snowflake {
- s := c.snowflake
log.Printf("WebRTC: closing DataChannel")
+ dataChannel := c.snowflake
// Setting snowflake to nil *before* Close indicates to OnClose that it
// was locally triggered.
c.snowflake = nil
- s.Close()
+ dataChannel.Close()
}
if nil != c.pc {
log.Printf("WebRTC: closing PeerConnection")
More information about the tor-commits
mailing list