[tor-commits] [snowflake/master] Include answer channel as part of the webRTCConn struct (#12)
serene at torproject.org
serene at torproject.org
Thu Feb 18 22:15:38 UTC 2016
commit a1b7e01c5423c69548afdf44c7455736605b5574
Author: Serene Han <keroserene+git at gmail.com>
Date: Wed Feb 17 20:41:33 2016 -0800
Include answer channel as part of the webRTCConn struct (#12)
---
client/client_test.go | 4 ++-
client/snowflake.go | 78 ++++++++++++++++++++++++++++++++-------------------
2 files changed, 52 insertions(+), 30 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go
index 546382f..ff15d6d 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -2,6 +2,7 @@ package main
import (
"bytes"
+ "github.com/keroserene/go-webrtc"
. "github.com/smartystreets/goconvey/convey"
"testing"
)
@@ -41,8 +42,9 @@ func TestConnect(t *testing.T) {
Convey("Receive answer fails on nil answer", func() {
c.reset = make(chan struct{})
+ c.answerChannel = make(chan *webrtc.SessionDescription)
c.ReceiveAnswer()
- answerChannel <- nil
+ c.answerChannel <- nil
<-c.reset
})
diff --git a/client/snowflake.go b/client/snowflake.go
index 6d612ce..081cb3b 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -28,7 +28,6 @@ var frontDomain string
// When a connection handler starts, +1 is written to this channel; when it
// ends, -1 is written.
var handlerChan = make(chan int)
-var answerChannel = make(chan *webrtc.SessionDescription)
const (
ReconnectTimeout = 5
@@ -37,7 +36,6 @@ const (
func copyLoop(a, b net.Conn) {
var wg sync.WaitGroup
wg.Add(2)
-
go func() {
io.Copy(b, a)
wg.Done()
@@ -46,8 +44,8 @@ func copyLoop(a, b net.Conn) {
io.Copy(a, b)
wg.Done()
}()
-
wg.Wait()
+ log.Println("copy loop ended")
}
// Interface that matches both webrc.DataChannel and for testing.
@@ -58,16 +56,17 @@ type SnowflakeChannel interface {
// Implements net.Conn interface
type webRTCConn struct {
- config *webrtc.Configuration
- pc *webrtc.PeerConnection
- snowflake SnowflakeChannel // Interface holding the WebRTC DataChannel.
- broker *BrokerChannel
- offerChannel chan *webrtc.SessionDescription
- errorChannel chan error
- recvPipe *io.PipeReader
- writePipe *io.PipeWriter
- buffer bytes.Buffer
- reset chan struct{}
+ config *webrtc.Configuration
+ pc *webrtc.PeerConnection
+ snowflake SnowflakeChannel // Interface holding the WebRTC DataChannel.
+ broker *BrokerChannel
+ offerChannel chan *webrtc.SessionDescription
+ answerChannel chan *webrtc.SessionDescription
+ errorChannel chan error
+ recvPipe *io.PipeReader
+ writePipe *io.PipeWriter
+ buffer bytes.Buffer
+ reset chan struct{}
}
var webrtcRemote *webRTCConn
@@ -164,6 +163,9 @@ func (c *webRTCConn) EstablishDataChannel() error {
}
dc.OnOpen = func() {
log.Println("WebRTC: DataChannel.OnOpen")
+ // if nil != c.snowflake {
+ // panic("PeerConnection snowflake already exists.")
+ // }
// Flush the buffer, then enable datachannel.
// TODO: Make this more safe
dc.Send(c.buffer.Bytes())
@@ -175,14 +177,18 @@ func (c *webRTCConn) EstablishDataChannel() error {
// Disable the DataChannel as a write destination.
// Future writes will go to the buffer until a new DataChannel is available.
log.Println("WebRTC: DataChannel.OnClose")
- c.snowflake = nil
- c.reset <- struct{}{} // Attempt to negotiate a new datachannel..
+ if nil != c.snowflake {
+ c.snowflake = nil
+ // Only reset if this OnClose triggered
+ c.Reset()
+ }
}
dc.OnMessage = func(msg []byte) {
log.Printf("OnMessage <--- %d bytes", len(msg))
n, err := c.writePipe.Write(msg)
if err != nil {
// TODO: Maybe shouldn't actually close.
+ log.Println("Error writing to SOCKS pipe")
c.writePipe.CloseWithError(err)
}
if n != len(msg) {
@@ -205,7 +211,7 @@ func (c *webRTCConn) SendOffer() error {
log.Printf("----------------")
return nil
}
- // Use Broker...
+ // Otherwise, use Broker.
go func() {
log.Println("Sending offer via BrokerChannel...\nTarget URL: ", brokerURL,
"\nFront URL: ", frontDomain)
@@ -214,7 +220,7 @@ func (c *webRTCConn) SendOffer() error {
log.Printf("BrokerChannel error: %s", err)
answer = nil
}
- answerChannel <- answer
+ c.answerChannel <- answer
}()
case err := <-c.errorChannel:
c.pc.Close()
@@ -225,11 +231,11 @@ func (c *webRTCConn) SendOffer() error {
func (c *webRTCConn) ReceiveAnswer() {
go func() {
- answer, ok := <-answerChannel
+ answer, ok := <-c.answerChannel
if !ok || nil == answer {
log.Printf("Failed to retrieve answer. Retrying in %d seconds", ReconnectTimeout)
<-time.After(time.Second * ReconnectTimeout)
- c.reset <- struct{}{}
+ c.Reset()
return
}
log.Printf("Received Answer:\n\n%s\n", answer.Sdp)
@@ -247,6 +253,12 @@ func (c *webRTCConn) sendData(data []byte) {
c.buffer.Write(data)
return
}
+ // Otherwise, flush buffer if necessary.
+ for c.buffer.Len() > 0 {
+ c.snowflake.Send(c.buffer.Bytes())
+ log.Println("Flushed ", c.buffer.Len(), " bytes")
+ c.buffer.Reset()
+ }
log.Printf("Write %d bytes --> WebRTC", len(data))
c.snowflake.Send(data)
}
@@ -256,18 +268,25 @@ func (c *webRTCConn) ConnectLoop() {
for {
log.Println("Establishing WebRTC connection...")
// TODO: When go-webrtc is more stable, it's possible that a new
- // PeerConnection won't need to be recreated each time.
- // called once.
+ // PeerConnection won't need to be re-prepared each time.
c.PreparePeerConnection()
- c.EstablishDataChannel()
- c.SendOffer()
- c.ReceiveAnswer()
-
- <-c.reset
- log.Println(" --- snowflake connection reset ---")
+ err := c.EstablishDataChannel()
+ if err == nil {
+ c.SendOffer()
+ c.ReceiveAnswer()
+ <-c.reset
+ log.Println(" --- snowflake connection reset ---")
+ }
}
}
+func (c *webRTCConn) Reset() {
+ go func() {
+ c.reset <- struct{}{} // Attempt to negotiate a new datachannel..
+ log.Println("WebRTC resetting...")
+ }()
+}
+
// Initialize a WebRTC Connection.
func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
*webRTCConn, error) {
@@ -275,6 +294,7 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
connection.config = config
connection.broker = broker
connection.offerChannel = make(chan *webrtc.SessionDescription)
+ connection.answerChannel = make(chan *webrtc.SessionDescription)
connection.errorChannel = make(chan error)
connection.reset = make(chan struct{})
// Pipes remain the same even when DataChannel gets switched.
@@ -363,10 +383,10 @@ func readSignalingMessages(f *os.File) {
log.Printf("ignoring invalid signal message %+q", msg)
continue
}
- answerChannel <- sdp
+ webrtcRemote.answerChannel <- sdp
}
log.Printf("close answerChannel")
- close(answerChannel)
+ close(webrtcRemote.answerChannel)
if err := s.Err(); err != nil {
log.Printf("signal FIFO: %s", err)
}
More information about the tor-commits
mailing list