[tor-commits] [snowflake/master] interfaces.go, SnowflakeCollector, better composition
serene at torproject.org
serene at torproject.org
Sun Jun 12 19:44:05 UTC 2016
commit 556596cc5aa14eecb687bbfb44188d1e733b6855
Author: Serene Han <keroserene+git at gmail.com>
Date: Tue May 24 15:18:54 2016 -0700
interfaces.go, SnowflakeCollector, better composition
---
client/client_test.go | 124 +++++++++++++++++++++++++-------------
client/interfaces.go | 30 ++++++++++
client/snowflake.go | 161 +++++++++++++++++++++++---------------------------
client/webrtc.go | 18 ++++++
4 files changed, 205 insertions(+), 128 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go
index 1ccf206..0bd3844 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -3,12 +3,15 @@ package main
import (
"bytes"
"fmt"
- "github.com/keroserene/go-webrtc"
- . "github.com/smartystreets/goconvey/convey"
"io/ioutil"
+ "net"
"net/http"
"strings"
"testing"
+
+ // "git.torproject.org/pluggable-transports/goptlib.git"
+ "github.com/keroserene/go-webrtc"
+ . "github.com/smartystreets/goconvey/convey"
)
type MockDataChannel struct {
@@ -56,56 +59,93 @@ func (w FakeDialer) Catch() (*webRTCConn, error) {
return &webRTCConn{}, nil
}
+type FakeSocksConn struct {
+ net.Conn
+ rejected bool
+}
+
+func (f FakeSocksConn) Reject() error {
+ f.rejected = true
+ return nil
+}
+func (f FakeSocksConn) Grant(addr *net.TCPAddr) error {
+ return nil
+}
+
+type FakeSnowflakeJar struct {
+ toRelease *webRTCConn
+}
+
+func (f FakeSnowflakeJar) Release() *webRTCConn {
+ return nil
+}
+
+func (f FakeSnowflakeJar) Collect() (*webRTCConn, error) {
+ return nil, nil
+}
+
func TestSnowflakeClient(t *testing.T) {
- Convey("Snowflake", t, func() {
- Convey("Peers", func() {
+ Convey("WebRTC ConnectLoop", t, func() {
- Convey("WebRTC ConnectLoop continues until capacity of 1.\n", func() {
- peers := NewPeers(1)
- peers.Tongue = FakeDialer{}
+ Convey("WebRTC ConnectLoop continues until capacity of 1.\n", func() {
+ snowflakes := NewSnowflakeJar(1)
+ snowflakes.Tongue = FakeDialer{}
- go ConnectLoop(peers)
- <-peers.maxedChan
+ go ConnectLoop(snowflakes)
+ <-snowflakes.maxedChan
- So(peers.Count(), ShouldEqual, 1)
- r := <-peers.snowflakeChan
- So(r, ShouldNotBeNil)
- So(peers.Count(), ShouldEqual, 0)
- })
+ So(snowflakes.Count(), ShouldEqual, 1)
+ r := <-snowflakes.snowflakeChan
+ So(r, ShouldNotBeNil)
+ So(snowflakes.Count(), ShouldEqual, 0)
+ })
- Convey("WebRTC ConnectLoop continues until capacity of 3.\n", func() {
- peers := NewPeers(3)
- peers.Tongue = FakeDialer{}
-
- go ConnectLoop(peers)
- <-peers.maxedChan
- So(peers.Count(), ShouldEqual, 3)
- <-peers.snowflakeChan
- <-peers.snowflakeChan
- <-peers.snowflakeChan
- So(peers.Count(), ShouldEqual, 0)
- })
+ Convey("WebRTC ConnectLoop continues until capacity of 3.\n", func() {
+ snowflakes := NewSnowflakeJar(3)
+ snowflakes.Tongue = FakeDialer{}
+
+ go ConnectLoop(snowflakes)
+ <-snowflakes.maxedChan
+ So(snowflakes.Count(), ShouldEqual, 3)
+ <-snowflakes.snowflakeChan
+ <-snowflakes.snowflakeChan
+ <-snowflakes.snowflakeChan
+ So(snowflakes.Count(), ShouldEqual, 0)
+ })
- Convey("WebRTC ConnectLoop continues filling when Snowflakes disconnect.\n", func() {
- peers := NewPeers(3)
- peers.Tongue = FakeDialer{}
+ Convey("WebRTC ConnectLoop continues filling when Snowflakes disconnect.\n", func() {
+ snowflakes := NewSnowflakeJar(3)
+ snowflakes.Tongue = FakeDialer{}
- go ConnectLoop(peers)
- <-peers.maxedChan
- So(peers.Count(), ShouldEqual, 3)
+ go ConnectLoop(snowflakes)
+ <-snowflakes.maxedChan
+ So(snowflakes.Count(), ShouldEqual, 3)
- r := <-peers.snowflakeChan
- So(peers.Count(), ShouldEqual, 2)
- r.Close()
- <-peers.maxedChan
- So(peers.Count(), ShouldEqual, 3)
+ r := <-snowflakes.snowflakeChan
+ So(snowflakes.Count(), ShouldEqual, 2)
+ r.Close()
+ <-snowflakes.maxedChan
+ So(snowflakes.Count(), ShouldEqual, 3)
+
+ <-snowflakes.snowflakeChan
+ <-snowflakes.snowflakeChan
+ <-snowflakes.snowflakeChan
+ So(snowflakes.Count(), ShouldEqual, 0)
+ })
+ })
+
+ Convey("Snowflake", t, func() {
+
+ SkipConvey("Handler Grants correctly", func() {
+ socks := &FakeSocksConn{}
+ snowflakes := &FakeSnowflakeJar{}
+
+ So(socks.rejected, ShouldEqual, false)
+ snowflakes.toRelease = nil
+ handler(socks, snowflakes)
+ So(socks.rejected, ShouldEqual, true)
- <-peers.snowflakeChan
- <-peers.snowflakeChan
- <-peers.snowflakeChan
- So(peers.Count(), ShouldEqual, 0)
- })
})
Convey("WebRTC Connection", func() {
diff --git a/client/interfaces.go b/client/interfaces.go
new file mode 100644
index 0000000..80a2ba3
--- /dev/null
+++ b/client/interfaces.go
@@ -0,0 +1,30 @@
+package main
+
+import (
+ "net"
+)
+
+// Interface for collecting and releasing snowflakes.
+type SnowflakeCollector interface {
+ Collect() (*webRTCConn, error)
+ Release() *webRTCConn
+}
+
+// Interface for catching those wild Snowflakes.
+type Tongue interface {
+ Catch() (*webRTCConn, error)
+}
+
+// Interface which primarily adapts to goptlib's SocksConn struct.
+type SocksConnector interface {
+ Grant(*net.TCPAddr) error
+ Reject() error
+ net.Conn
+}
+
+// Interface for the Snowflake's transport.
+// (Specifically, webrtc.DataChannel)
+type SnowflakeChannel interface {
+ Send([]byte)
+ Close() error
+}
diff --git a/client/snowflake.go b/client/snowflake.go
index 464420c..cd7f151 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -1,4 +1,5 @@
// Client transport plugin for the Snowflake pluggable transport.
+// In the Client context, "Snowflake" refers to a remote browser proxy.
package main
import (
@@ -49,22 +50,12 @@ func copyLoop(a, b net.Conn) {
log.Println("copy loop ended")
}
-// Interface for catching Snowflakes.
-type Tongue interface {
- Catch() (*webRTCConn, error)
-}
-
-// Interface for the Snowflake transport. (usually a webrtc.DataChannel)
-type SnowflakeChannel interface {
- Send([]byte)
- Close() error
-}
-
-// Collect and track available remote WebRTC Peers, to switch between if the
-// current one disconnects.
-// Right now, it is only possible to use one remote in a circuit. This can be
-// updated once multiplexed transport on a single circuit is available.
-type Peers struct {
+// Collect and track available snowflakes. (Implements SnowflakeCollector)
+// Right now, it is only possible to use one active remote in a circuit.
+// This can be updated once multiplexed transport on a single circuit is available.
+// Keeping multiple WebRTC connections available allows for quicker recovery when
+// the current snowflake disconnects.
+type SnowflakeJar struct {
Tongue
BytesLogger
@@ -74,30 +65,42 @@ type Peers struct {
maxedChan chan struct{}
}
-func NewPeers(max int) *Peers {
- p := &Peers{capacity: max}
+func NewSnowflakeJar(max int) *SnowflakeJar {
+ p := &SnowflakeJar{capacity: max}
p.snowflakeChan = make(chan *webRTCConn, max)
p.maxedChan = make(chan struct{}, 1)
return p
}
-// Find, connect, and add a new peer to the internal collection.
-func (p *Peers) FindSnowflake() (*webRTCConn, error) {
- if p.Count() >= p.capacity {
- s := fmt.Sprintf("At capacity [%d/%d]", p.Count(), p.capacity)
- p.maxedChan <- struct{}{}
+// Establish connection to some remote WebRTC peer, and keep it available for
+// later.
+func (jar *SnowflakeJar) Collect() (*webRTCConn, error) {
+ if jar.Count() >= jar.capacity {
+ s := fmt.Sprintf("At capacity [%d/%d]", jar.Count(), jar.capacity)
+ jar.maxedChan <- struct{}{}
return nil, errors.New(s)
}
- connection, err := p.Catch()
- connection.BytesLogger = p.BytesLogger
+ snowflake, err := jar.Catch()
if err != nil {
return nil, err
}
- return connection, nil
+ jar.snowflakeChan <- snowflake
+ return snowflake, nil
+}
+
+// Prepare and present an available remote WebRTC peer for active use.
+func (jar *SnowflakeJar) Release() *webRTCConn {
+ snowflake, ok := <-jar.snowflakeChan
+ if !ok {
+ return nil
+ }
+ jar.current = snowflake
+ snowflake.BytesLogger = jar.BytesLogger
+ return snowflake
}
// TODO: Needs fixing.
-func (p *Peers) Count() int {
+func (p *SnowflakeJar) Count() int {
count := 0
if p.current != nil {
count = 1
@@ -106,7 +109,7 @@ func (p *Peers) Count() int {
}
// Close all remote peers.
-func (p *Peers) End() {
+func (p *SnowflakeJar) End() {
log.Printf("WebRTC: interruped")
if nil != p.current {
p.current.Close()
@@ -118,87 +121,71 @@ func (p *Peers) End() {
// Maintain |SnowflakeCapacity| number of available WebRTC connections, to
// transfer to the Tor SOCKS handler when needed.
-func ConnectLoop(peers *Peers) {
+func ConnectLoop(snowflakes SnowflakeCollector) {
for {
- s, err := peers.FindSnowflake()
+ s, err := snowflakes.Collect()
if nil == s || nil != err {
log.Println("WebRTC Error:", err,
" Retrying in", ReconnectTimeout, "seconds...")
+ // Failed collections get a timeout.
<-time.After(time.Second * ReconnectTimeout)
continue
}
- peers.snowflakeChan <- s
+ // Successful collection gets rate limited to once per second.
<-time.After(time.Second)
}
}
-// Implements |Tongue|
-type WebRTCDialer struct {
- *BrokerChannel
-}
-
-// Initialize a WebRTC Connection by signaling through the broker.
-func (w WebRTCDialer) Catch() (*webRTCConn, error) {
- if nil == w.BrokerChannel {
- return nil, errors.New("Cannot Dial WebRTC without a BrokerChannel.")
+// Accept local SOCKS connections and pass them to the handler.
+func acceptLoop(ln *pt.SocksListener, snowflakes SnowflakeCollector) error {
+ defer ln.Close()
+ for {
+ log.Println("SOCKS listening...", ln)
+ conn, err := ln.AcceptSocks()
+ log.Println("accepting", conn, err)
+ if err != nil {
+ if e, ok := err.(net.Error); ok && e.Temporary() {
+ continue
+ }
+ return err
+ }
+ err = handler(conn, snowflakes)
+ if err != nil {
+ log.Printf("handler error: %s", err)
+ }
}
- // TODO: [#3] Fetch ICE server information from Broker.
- // TODO: [#18] Consider TURN servers here too.
- config := webrtc.NewConfiguration(iceServers...)
- connection := NewWebRTCConnection(config, w.BrokerChannel)
- err := connection.Connect()
- return connection, err
}
-// Establish a WebRTC channel for SOCKS connections.
-func handler(conn *pt.SocksConn, peers *Peers) error {
+// Given an accepted SOCKS connection, establish a WebRTC connection to the
+// remote peer and exchange traffic.
+func handler(socks SocksConnector, snowflakes SnowflakeCollector) error {
handlerChan <- 1
defer func() {
handlerChan <- -1
}()
// Wait for an available WebRTC remote...
- remote, ok := <-peers.snowflakeChan
- peers.current = remote
- if remote == nil || !ok {
- conn.Reject()
+ snowflake := snowflakes.Release()
+ if nil == snowflake {
+ socks.Reject()
return errors.New("handler: Received invalid Snowflake")
}
- defer conn.Close()
+ defer socks.Close()
log.Println("handler: Snowflake assigned.")
-
- err := conn.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0})
+ err := socks.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0})
if err != nil {
return err
}
- go copyLoop(conn, remote)
+ // Begin exchanging data.
+ go copyLoop(socks, snowflake)
+
// When WebRTC resets, close the SOCKS connection, which induces new handler.
- <-remote.reset
+ // TODO: Double check this / fix it.
+ <-snowflake.reset
log.Println("---- Closed ---")
return nil
}
-func acceptLoop(ln *pt.SocksListener, peers *Peers) error {
- defer ln.Close()
- for {
- log.Println("SOCKS listening...", ln)
- conn, err := ln.AcceptSocks()
- log.Println("accepting", conn, err)
- if err != nil {
- if e, ok := err.(net.Error); ok && e.Temporary() {
- continue
- }
- return err
- }
- go func() {
- err := handler(conn, peers)
- if err != nil {
- log.Printf("handler error: %s", err)
- }
- }()
- }
-}
-
// TODO: Fix since multiplexing changes access to remotes.
func readSignalingMessages(f *os.File) {
log.Printf("readSignalingMessages")
@@ -258,19 +245,21 @@ func main() {
go readSignalingMessages(signalFile)
}
- // Prepare WebRTC Peers and the Broker, then accumulate connections.
+ // Prepare WebRTC SnowflakeCollector and the Broker, then accumulate connections.
// TODO: Expose remote peer capacity as a flag?
- remotes := NewPeers(SnowflakeCapacity)
+ snowflakes := NewSnowflakeJar(SnowflakeCapacity)
+
broker := NewBrokerChannel(brokerURL, frontDomain, CreateBrokerTransport())
+ snowflakes.Tongue = WebRTCDialer{broker}
- remotes.BytesLogger = &BytesSyncLogger{
+ // Use a real logger for traffic.
+ snowflakes.BytesLogger = &BytesSyncLogger{
inboundChan: make(chan int, 5), outboundChan: make(chan int, 5),
inbound: 0, outbound: 0, inEvents: 0, outEvents: 0,
}
- go remotes.BytesLogger.Log()
- remotes.Tongue = WebRTCDialer{broker}
- go ConnectLoop(remotes)
+ go ConnectLoop(snowflakes)
+ go snowflakes.BytesLogger.Log()
ptInfo, err = pt.ClientSetup(nil)
if err != nil {
@@ -292,7 +281,7 @@ func main() {
pt.CmethodError(methodName, err.Error())
break
}
- go acceptLoop(ln, remotes)
+ go acceptLoop(ln, snowflakes)
pt.Cmethod(methodName, ln.Version(), ln.Addr())
listeners = append(listeners, ln)
default:
@@ -319,7 +308,7 @@ func main() {
ln.Close()
}
- remotes.End()
+ snowflakes.End()
// wait for second signal or no more handlers
sig = nil
diff --git a/client/webrtc.go b/client/webrtc.go
index a47ac19..41642be 100644
--- a/client/webrtc.go
+++ b/client/webrtc.go
@@ -11,6 +11,24 @@ import (
"time"
)
+// Implements the |Tongue| interface to catch snowflakes, using a BrokerChannel.
+type WebRTCDialer struct {
+ *BrokerChannel
+}
+
+// Initialize a WebRTC Connection by signaling through the broker.
+func (w WebRTCDialer) Catch() (*webRTCConn, error) {
+ if nil == w.BrokerChannel {
+ return nil, errors.New("Cannot Dial WebRTC without a BrokerChannel.")
+ }
+ // TODO: [#3] Fetch ICE server information from Broker.
+ // TODO: [#18] Consider TURN servers here too.
+ config := webrtc.NewConfiguration(iceServers...)
+ connection := NewWebRTCConnection(config, w.BrokerChannel)
+ err := connection.Connect()
+ return connection, err
+}
+
// Remote WebRTC peer. Implements the |net.Conn| interface.
type webRTCConn struct {
config *webrtc.Configuration
More information about the tor-commits
mailing list