[tor-commits] [snowflake/master] Buffer writes to DataChannel, remove blocking on openChannel (#12)
serene at torproject.org
serene at torproject.org
Thu Feb 18 22:15:38 UTC 2016
commit eb7eb04ac01b7a7d2466958b35403c6729014a1f
Author: Serene Han <keroserene+git at gmail.com>
Date: Wed Feb 17 18:38:40 2016 -0800
Buffer writes to DataChannel, remove blocking on openChannel (#12)
---
client/client_test.go | 44 +++++++++++++++++++++++++
client/snowflake.go | 89 ++++++++++++++++++++++++++++-----------------------
2 files changed, 93 insertions(+), 40 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go
new file mode 100644
index 0000000..7b8dad2
--- /dev/null
+++ b/client/client_test.go
@@ -0,0 +1,44 @@
+package main
+
+import (
+ "bytes"
+ . "github.com/smartystreets/goconvey/convey"
+ "testing"
+)
+
+type MockDataChannel struct {
+ destination bytes.Buffer
+}
+
+func (m *MockDataChannel) Send(data []byte) {
+ m.destination.Write(data)
+}
+
+func (*MockDataChannel) Close() error {
+ return nil
+}
+
+func TestConnect(t *testing.T) {
+ Convey("Snowflake", t, func() {
+
+ Convey("WebRTC Connection", func() {
+ c := new(webRTCConn)
+ So(c.buffer.Bytes(), ShouldEqual, nil)
+
+ Convey("SendData buffers when datachannel is nil", func() {
+ c.sendData([]byte("test"))
+ c.snowflake = nil
+ So(c.buffer.Bytes(), ShouldResemble, []byte("test"))
+ })
+
+ Convey("SendData sends to datachannel when not nil", func() {
+ mock := new(MockDataChannel)
+ c.snowflake = mock
+ c.sendData([]byte("test"))
+ So(c.buffer.Bytes(), ShouldEqual, nil)
+ So(mock.destination.Bytes(), ShouldResemble, []byte("test"))
+ })
+ })
+
+ })
+}
diff --git a/client/snowflake.go b/client/snowflake.go
index 7c47fbb..771e90b 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -3,6 +3,7 @@ package main
import (
"bufio"
+ "bytes"
"errors"
"flag"
"fmt"
@@ -46,16 +47,22 @@ func copyLoop(a, b net.Conn) {
wg.Wait()
}
+// Interface that matches both webrc.DataChannel and for testing.
+type SnowflakeChannel interface {
+ Send([]byte)
+ Close() error
+}
+
// Implements net.Conn interface
type webRTCConn struct {
pc *webrtc.PeerConnection
- dc *webrtc.DataChannel
+ snowflake SnowflakeChannel // Interface holding the WebRTC DataChannel.
broker *BrokerChannel
- recvPipe *io.PipeReader
- writePipe *io.PipeWriter
offerChannel chan *webrtc.SessionDescription
errorChannel chan error
- openChannel chan struct{}
+ recvPipe *io.PipeReader
+ writePipe *io.PipeWriter
+ buffer bytes.Buffer
}
var webrtcRemote *webRTCConn
@@ -66,9 +73,7 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
func (c *webRTCConn) Write(b []byte) (int, error) {
// log.Printf("webrtc Write %d %+q", len(b), string(b))
- log.Printf("Write %d bytes --> WebRTC", len(b))
- // Buffer in case datachannel isn't available.
- c.dc.Send(b)
+ c.sendData(b)
return len(b), nil
}
@@ -98,21 +103,29 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
}
// Create a WebRTC DataChannel locally.
-// This triggers "OnNegotiationNeeded" which should prepare an SDP offer.
func (c *webRTCConn) EstablishDataChannel() error {
dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{})
+ // Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare
+ // an SDP offer while other goroutines operating on this struct handle the
+ // signaling. Eventually fires "OnOpen".
if err != nil {
log.Printf("CreateDataChannel: %s", err)
return err
}
dc.OnOpen = func() {
- log.Println("OnOpen channel")
- c.openChannel <- struct{}{}
+ log.Println("WebRTC: DataChannel.OnOpen")
+ // Flush the buffer, then enable datachannel.
+ // TODO: Make this more safe
+ dc.Send(c.buffer.Bytes())
+ log.Println("Flushed ", c.buffer.Len(), " bytes")
+ c.buffer.Reset()
+ c.snowflake = dc
}
dc.OnClose = func() {
- log.Println("OnClose channel")
- // writePipe.Close()
- close(c.openChannel)
+ // Disable the DataChannel as a write destination.
+ // Future writes will go to the buffer until a new DataChannel is available.
+ log.Println("WebRTC: DataChannel.OnClose")
+ c.snowflake = nil
// TODO: (Issue #12) Should attempt to renegotiate at this point.
}
dc.OnMessage = func(msg []byte) {
@@ -126,7 +139,6 @@ func (c *webRTCConn) EstablishDataChannel() error {
panic("short write")
}
}
- c.dc = dc
return nil
}
@@ -153,8 +165,8 @@ func (c *webRTCConn) sendOffer() error {
}
if nil == answer {
log.Printf("BrokerChannel: No answer received.")
+ // TODO: Should try again here.
return
- // return errors.New("No answer received.")
}
answerChannel <- answer
}()
@@ -165,6 +177,18 @@ func (c *webRTCConn) sendOffer() error {
return nil
}
+func (c *webRTCConn) sendData(data []byte) {
+ // Buffer the data in case datachannel isn't available yet.
+ if nil == c.snowflake {
+ log.Printf("Buffered %d bytes --> WebRTC", len(data))
+ c.buffer.Write(data)
+ return
+ }
+ log.Printf("Write %d bytes --> WebRTC", len(data))
+ c.snowflake.Send(data)
+}
+
+// Initialize a WebRTC Connection.
func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
*webRTCConn, error) {
pc, err := webrtc.NewPeerConnection(config)
@@ -177,13 +201,14 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
connection.pc = pc
connection.offerChannel = make(chan *webrtc.SessionDescription)
connection.errorChannel = make(chan error)
- connection.openChannel = make(chan struct{})
+ // Pipes remain the same even when DataChannel gets switched.
+ connection.recvPipe, connection.writePipe = io.Pipe()
- // Triggered by CreateDataChannel.
pc.OnNegotiationNeeded = func() {
log.Println("OnNegotiationNeeded")
go func() {
offer, err := pc.CreateOffer()
+ // TODO: Potentially timeout and retry if ICE isn't working.
if err != nil {
connection.errorChannel <- err
return
@@ -208,18 +233,17 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
// of the data channel, not the remote peer.
pc.OnDataChannel = func(channel *webrtc.DataChannel) {
log.Println("OnDataChannel")
- panic("OnDataChannel")
+ panic("Unexpected OnDataChannel!")
}
- // Pipes remain the same even when DataChannel gets switched.
- connection.recvPipe, connection.writePipe = io.Pipe()
-
connection.EstablishDataChannel()
- connection.sendOffer()
+ // TODO: Make this part of a re-establishment loop.
+ connection.sendOffer()
log.Printf("waiting for answer...")
answer, ok := <-answerChannel
if !ok {
+ // TODO: Don't just fail, try again!
pc.Close()
return nil, fmt.Errorf("no answer received")
}
@@ -230,15 +254,6 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
return nil, err
}
- // Wait until data channel is open; otherwise for example sends may get
- // lost.
- // TODO: Buffering *should* work though.
- _, ok = <-connection.openChannel
- if !ok {
- pc.Close()
- return nil, fmt.Errorf("failed to open data channel")
- }
-
return connection, nil
}
@@ -247,9 +262,9 @@ func endWebRTC() {
if nil == webrtcRemote {
return
}
- if nil != webrtcRemote.dc {
+ if nil != webrtcRemote.snowflake {
log.Printf("WebRTC: closing DataChannel")
- webrtcRemote.dc.Close()
+ webrtcRemote.snowflake.Close()
}
if nil != webrtcRemote.pc {
log.Printf("WebRTC: closing PeerConnection")
@@ -332,7 +347,7 @@ func readSignalingMessages(f *os.File) {
func main() {
var err error
-
+ webrtc.SetLoggingVerbosity(1)
flag.StringVar(&brokerURL, "url", "", "URL of signaling broker")
flag.StringVar(&frontDomain, "front", "", "front domain")
flag.Parse()
@@ -368,8 +383,6 @@ func main() {
go readSignalingMessages(signalFile)
}
- webrtc.SetLoggingVerbosity(1)
-
ptInfo, err = pt.ClientSetup(nil)
if err != nil {
log.Fatal(err)
@@ -417,10 +430,6 @@ func main() {
ln.Close()
}
- // if syscall.SIGTERM == sig || syscall.SIGINT == sig {
- // return
- // }
-
// wait for second signal or no more handlers
sig = nil
for sig == nil && numHandlers != 0 {
More information about the tor-commits
mailing list