[tor-commits] [snowflake/master] Separate peers.go file with improved documentation and more solid interfaces
serene at torproject.org
serene at torproject.org
Sun Jun 12 19:44:05 UTC 2016
commit c63f5cfc0a391af99cfa52ab90c05b9f0b253854
Author: Serene Han <keroserene+git at gmail.com>
Date: Sat Jun 11 18:24:08 2016 -0700
Separate peers.go file with improved documentation and more solid interfaces
---
client/client_test.go | 94 ++++++++++++++++++++++++++++++++++-----------------
client/interfaces.go | 28 +++++++++------
client/peers.go | 91 +++++++++++++++++++++++++++++++++++++++++++++++++
client/snowflake.go | 83 ++++-----------------------------------------
client/webrtc.go | 10 +++++-
5 files changed, 186 insertions(+), 120 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go
index 0bd3844..f58aeb0 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -9,7 +9,6 @@ import (
"strings"
"testing"
- // "git.torproject.org/pluggable-transports/goptlib.git"
"github.com/keroserene/go-webrtc"
. "github.com/smartystreets/goconvey/convey"
)
@@ -24,9 +23,7 @@ func (m *MockDataChannel) Send(data []byte) {
m.done <- true
}
-func (*MockDataChannel) Close() error {
- return nil
-}
+func (*MockDataChannel) Close() error { return nil }
type MockResponse struct{}
@@ -34,13 +31,9 @@ func (m *MockResponse) Read(p []byte) (int, error) {
p = []byte(`{"type":"answer","sdp":"fake"}`)
return 0, nil
}
-func (m *MockResponse) Close() error {
- return nil
-}
+func (m *MockResponse) Close() error { return nil }
-type MockTransport struct {
- statusOverride int
-}
+type MockTransport struct{ statusOverride int }
// Just returns a response with fake SDP answer.
func (m *MockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
@@ -68,28 +61,17 @@ func (f FakeSocksConn) Reject() error {
f.rejected = true
return nil
}
-func (f FakeSocksConn) Grant(addr *net.TCPAddr) error {
- return nil
-}
-
-type FakeSnowflakeJar struct {
- toRelease *webRTCConn
-}
+func (f FakeSocksConn) Grant(addr *net.TCPAddr) error { return nil }
-func (f FakeSnowflakeJar) Release() *webRTCConn {
- return nil
-}
+type FakePeers struct{ toRelease *webRTCConn }
-func (f FakeSnowflakeJar) Collect() (*webRTCConn, error) {
- return nil, nil
-}
+func (f FakePeers) Collect() error { return nil }
+func (f FakePeers) Pop() *webRTCConn { return nil }
func TestSnowflakeClient(t *testing.T) {
-
- Convey("WebRTC ConnectLoop", t, func() {
-
+ SkipConvey("WebRTC ConnectLoop", t, func() {
Convey("WebRTC ConnectLoop continues until capacity of 1.\n", func() {
- snowflakes := NewSnowflakeJar(1)
+ snowflakes := NewPeers(1)
snowflakes.Tongue = FakeDialer{}
go ConnectLoop(snowflakes)
@@ -102,7 +84,7 @@ func TestSnowflakeClient(t *testing.T) {
})
Convey("WebRTC ConnectLoop continues until capacity of 3.\n", func() {
- snowflakes := NewSnowflakeJar(3)
+ snowflakes := NewPeers(3)
snowflakes.Tongue = FakeDialer{}
go ConnectLoop(snowflakes)
@@ -115,7 +97,7 @@ func TestSnowflakeClient(t *testing.T) {
})
Convey("WebRTC ConnectLoop continues filling when Snowflakes disconnect.\n", func() {
- snowflakes := NewSnowflakeJar(3)
+ snowflakes := NewPeers(3)
snowflakes.Tongue = FakeDialer{}
go ConnectLoop(snowflakes)
@@ -135,17 +117,67 @@ func TestSnowflakeClient(t *testing.T) {
})
})
+ Convey("Peers", t, func() {
+ Convey("Can construct", func() {
+ p := NewPeers(1)
+ So(p.capacity, ShouldEqual, 1)
+ So(p.current, ShouldEqual, nil)
+ So(p.snowflakeChan, ShouldNotBeNil)
+ So(cap(p.snowflakeChan), ShouldEqual, 1)
+ })
+
+ Convey("Collecting a Snowflake requires a Tongue.", func() {
+ p := NewPeers(1)
+ err := p.Collect()
+ So(err, ShouldNotBeNil)
+ So(p.Count(), ShouldEqual, 0)
+ // Set the dialer so that collection is possible.
+ p.Tongue = FakeDialer{}
+ err = p.Collect()
+ So(err, ShouldBeNil)
+ So(p.Count(), ShouldEqual, 1)
+ // S
+ err = p.Collect()
+ })
+
+ Convey("Collection continues until capacity.", func() {
+ c := 5
+ p := NewPeers(c)
+ p.Tongue = FakeDialer{}
+ // Fill up to capacity.
+ for i := 0 ; i < c ; i++ {
+ fmt.Println("Adding snowflake ", i)
+ err := p.Collect()
+ So(err, ShouldBeNil)
+ So(p.Count(), ShouldEqual, i + 1)
+ }
+ // But adding another gives an error.
+ So(p.Count(), ShouldEqual, c)
+ err := p.Collect()
+ So(err, ShouldNotBeNil)
+ So(p.Count(), ShouldEqual, c)
+
+ // But popping allows it to continue.
+ s := p.Pop()
+ So(s, ShouldNotBeNil)
+ So(p.Count(), ShouldEqual, c)
+
+ // err = p.Collect()
+ // So(err, ShouldNotBeNil)
+ // So(p.Count(), ShouldEqual, c)
+ })
+ })
+
Convey("Snowflake", t, func() {
SkipConvey("Handler Grants correctly", func() {
socks := &FakeSocksConn{}
- snowflakes := &FakeSnowflakeJar{}
+ snowflakes := &FakePeers{}
So(socks.rejected, ShouldEqual, false)
snowflakes.toRelease = nil
handler(socks, snowflakes)
So(socks.rejected, ShouldEqual, true)
-
})
Convey("WebRTC Connection", func() {
diff --git a/client/interfaces.go b/client/interfaces.go
index 80a2ba3..ba49a92 100644
--- a/client/interfaces.go
+++ b/client/interfaces.go
@@ -1,30 +1,36 @@
+// In the Client context, "Snowflake" refers to a remote browser proxy.
package main
import (
"net"
)
-// Interface for collecting and releasing snowflakes.
-type SnowflakeCollector interface {
- Collect() (*webRTCConn, error)
- Release() *webRTCConn
-}
-
-// Interface for catching those wild Snowflakes.
+// Interface for catching Snowflakes. (aka the remote dialer)
type Tongue interface {
Catch() (*webRTCConn, error)
}
-// Interface which primarily adapts to goptlib's SocksConn struct.
+// Interface for collecting some number of Snowflakes, for passing along
+// ultimately to the SOCKS handler.
+type SnowflakeCollector interface {
+
+ // Add a Snowflake to the collection.
+ // Implementation should decide how to connect and maintain the webRTCConn.
+ Collect() error
+
+ // Remove and return the most available Snowflake from the collection.
+ Pop() *webRTCConn
+}
+
+// Interface to adapt to goptlib's SocksConn struct.
type SocksConnector interface {
Grant(*net.TCPAddr) error
Reject() error
net.Conn
}
-// Interface for the Snowflake's transport.
-// (Specifically, webrtc.DataChannel)
-type SnowflakeChannel interface {
+// Interface for the Snowflake's transport. (Typically just webrtc.DataChannel)
+type SnowflakeDataChannel interface {
Send([]byte)
Close() error
}
diff --git a/client/peers.go b/client/peers.go
new file mode 100644
index 0000000..769174b
--- /dev/null
+++ b/client/peers.go
@@ -0,0 +1,91 @@
+package main
+
+import (
+ "errors"
+ "fmt"
+ "log"
+)
+
+// Container which keeps track of multiple WebRTC remote peers.
+// Implements |SnowflakeCollector|.
+//
+// Maintaining a set of pre-connected Peers with fresh but inactive datachannels
+// allows allows rapid recovery when the current WebRTC Peer disconnects.
+//
+// Note: For now, only one remote can be active at any given moment.
+// This is a property of Tor circuits & its current multiplexing constraints,
+// but could be updated if that changes.
+// (Also, this constraint does not necessarily apply to the more generic PT
+// version of Snowflake)
+type Peers struct {
+ Tongue
+ BytesLogger
+
+ snowflakeChan chan *webRTCConn
+ current *webRTCConn
+ capacity int
+ // TODO: Probably not necessary.
+ maxedChan chan struct{}
+}
+
+// Construct a fresh container of remote peers.
+func NewPeers(max int) *Peers {
+ p := &Peers{capacity: max, current: nil}
+ // Use buffered go channel to pass new snowflakes onwards to the SOCKS handler.
+ p.snowflakeChan = make(chan *webRTCConn, max)
+ p.maxedChan = make(chan struct{}, 1)
+ return p
+}
+
+// TODO: Needs fixing.
+func (p *Peers) Count() int {
+ count := 0
+ if p.current != nil {
+ count = 1
+ }
+ return count + len(p.snowflakeChan)
+}
+
+// As part of |SnowflakeCollector| interface.
+func (p *Peers) Collect() error {
+ if p.Count() >= p.capacity {
+ s := fmt.Sprintf("At capacity [%d/%d]", p.Count(), p.capacity)
+ p.maxedChan <- struct{}{}
+ return errors.New(s)
+ }
+ // Engage the Snowflake Catching interface, which must be available.
+ if nil == p.Tongue {
+ return errors.New("Missing Tongue to catch Snowflakes with.")
+ }
+ connection, err := p.Tongue.Catch()
+ if nil == connection || nil != err {
+ return err
+ }
+ // Use the same rate-limited traffic logger to keep consistency.
+ connection.BytesLogger = p.BytesLogger
+ p.snowflakeChan <- connection
+ return nil
+}
+
+// As part of |SnowflakeCollector| interface.
+func (p *Peers) Pop() *webRTCConn {
+ // Blocks until an available snowflake appears.
+ snowflake, ok := <-p.snowflakeChan
+ if !ok {
+ return nil
+ }
+ p.current = snowflake
+ snowflake.BytesLogger = p.BytesLogger
+ return snowflake
+}
+
+// Close all remote peers.
+func (p *Peers) End() {
+ log.Printf("WebRTC: interruped")
+ if nil != p.current {
+ p.current.Close()
+ }
+ for r := range p.snowflakeChan {
+ r.Close()
+ }
+}
diff --git a/client/snowflake.go b/client/snowflake.go
index cd7f151..f8edc2a 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -1,12 +1,10 @@
// Client transport plugin for the Snowflake pluggable transport.
-// In the Client context, "Snowflake" refers to a remote browser proxy.
package main
import (
"bufio"
"errors"
"flag"
- "fmt"
"io"
"log"
"net"
@@ -50,81 +48,12 @@ func copyLoop(a, b net.Conn) {
log.Println("copy loop ended")
}
-// Collect and track available snowflakes. (Implements SnowflakeCollector)
-// Right now, it is only possible to use one active remote in a circuit.
-// This can be updated once multiplexed transport on a single circuit is available.
-// Keeping multiple WebRTC connections available allows for quicker recovery when
-// the current snowflake disconnects.
-type SnowflakeJar struct {
- Tongue
- BytesLogger
-
- snowflakeChan chan *webRTCConn
- current *webRTCConn
- capacity int
- maxedChan chan struct{}
-}
-
-func NewSnowflakeJar(max int) *SnowflakeJar {
- p := &SnowflakeJar{capacity: max}
- p.snowflakeChan = make(chan *webRTCConn, max)
- p.maxedChan = make(chan struct{}, 1)
- return p
-}
-
-// Establish connection to some remote WebRTC peer, and keep it available for
-// later.
-func (jar *SnowflakeJar) Collect() (*webRTCConn, error) {
- if jar.Count() >= jar.capacity {
- s := fmt.Sprintf("At capacity [%d/%d]", jar.Count(), jar.capacity)
- jar.maxedChan <- struct{}{}
- return nil, errors.New(s)
- }
- snowflake, err := jar.Catch()
- if err != nil {
- return nil, err
- }
- jar.snowflakeChan <- snowflake
- return snowflake, nil
-}
-
-// Prepare and present an available remote WebRTC peer for active use.
-func (jar *SnowflakeJar) Release() *webRTCConn {
- snowflake, ok := <-jar.snowflakeChan
- if !ok {
- return nil
- }
- jar.current = snowflake
- snowflake.BytesLogger = jar.BytesLogger
- return snowflake
-}
-
-// TODO: Needs fixing.
-func (p *SnowflakeJar) Count() int {
- count := 0
- if p.current != nil {
- count = 1
- }
- return count + len(p.snowflakeChan)
-}
-
-// Close all remote peers.
-func (p *SnowflakeJar) End() {
- log.Printf("WebRTC: interruped")
- if nil != p.current {
- p.current.Close()
- }
- for r := range p.snowflakeChan {
- r.Close()
- }
-}
-
// Maintain |SnowflakeCapacity| number of available WebRTC connections, to
// transfer to the Tor SOCKS handler when needed.
func ConnectLoop(snowflakes SnowflakeCollector) {
for {
- s, err := snowflakes.Collect()
- if nil == s || nil != err {
+ err := snowflakes.Collect()
+ if nil != err {
log.Println("WebRTC Error:", err,
" Retrying in", ReconnectTimeout, "seconds...")
// Failed collections get a timeout.
@@ -163,8 +92,8 @@ func handler(socks SocksConnector, snowflakes SnowflakeCollector) error {
defer func() {
handlerChan <- -1
}()
- // Wait for an available WebRTC remote...
- snowflake := snowflakes.Release()
+ // Obtain an available WebRTC remote. May block.
+ snowflake := snowflakes.Pop()
if nil == snowflake {
socks.Reject()
return errors.New("handler: Received invalid Snowflake")
@@ -247,10 +176,10 @@ func main() {
// Prepare WebRTC SnowflakeCollector and the Broker, then accumulate connections.
// TODO: Expose remote peer capacity as a flag?
- snowflakes := NewSnowflakeJar(SnowflakeCapacity)
+ snowflakes := NewPeers(SnowflakeCapacity)
broker := NewBrokerChannel(brokerURL, frontDomain, CreateBrokerTransport())
- snowflakes.Tongue = WebRTCDialer{broker}
+ snowflakes.Tongue = NewWebRTCDialer(broker)
// Use a real logger for traffic.
snowflakes.BytesLogger = &BytesSyncLogger{
diff --git a/client/webrtc.go b/client/webrtc.go
index 41642be..2466a1d 100644
--- a/client/webrtc.go
+++ b/client/webrtc.go
@@ -14,6 +14,14 @@ import (
// Implements the |Tongue| interface to catch snowflakes, using a BrokerChannel.
type WebRTCDialer struct {
*BrokerChannel
+ webrtcConfig *webrtc.Configuration
+}
+
+func NewWebRTCDialer(broker *BrokerChannel) *WebRTCDialer {
+ return &WebRTCDialer{
+ broker,
+ webrtc.NewConfiguration(iceServers...),
+ }
}
// Initialize a WebRTC Connection by signaling through the broker.
@@ -33,7 +41,7 @@ func (w WebRTCDialer) Catch() (*webRTCConn, error) {
type webRTCConn struct {
config *webrtc.Configuration
pc *webrtc.PeerConnection
- snowflake SnowflakeChannel // Holds the WebRTC DataChannel.
+ snowflake SnowflakeDataChannel // Holds the WebRTC DataChannel.
broker *BrokerChannel
offerChannel chan *webrtc.SessionDescription
More information about the tor-commits
mailing list