[tor-commits] [snowflake/master] Snowflake client now using a reconnect loop (#12)
serene at torproject.org
serene at torproject.org
Thu Feb 18 22:15:38 UTC 2016
commit f205a0be59aec40a04580cd46dad1bf6e9eba6c3
Author: Serene Han <keroserene+git at gmail.com>
Date: Wed Feb 17 19:19:11 2016 -0800
Snowflake client now using a reconnect loop (#12)
---
client/client_test.go | 4 ++
client/snowflake.go | 151 +++++++++++++++++++++++++++++---------------------
2 files changed, 93 insertions(+), 62 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go
index 7b8dad2..399549a 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -38,6 +38,10 @@ func TestConnect(t *testing.T) {
So(c.buffer.Bytes(), ShouldEqual, nil)
So(mock.destination.Bytes(), ShouldResemble, []byte("test"))
})
+
+ Convey("Connect Loop", func() {
+ // TODO
+ })
})
})
diff --git a/client/snowflake.go b/client/snowflake.go
index 771e90b..907c8ae 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -28,9 +28,12 @@ var frontDomain string
// When a connection handler starts, +1 is written to this channel; when it
// ends, -1 is written.
var handlerChan = make(chan int)
-
var answerChannel = make(chan *webrtc.SessionDescription)
+const (
+ ReconnectTimeout = 5
+)
+
func copyLoop(a, b net.Conn) {
var wg sync.WaitGroup
wg.Add(2)
@@ -55,14 +58,16 @@ type SnowflakeChannel interface {
// Implements net.Conn interface
type webRTCConn struct {
+ config *webrtc.Configuration
pc *webrtc.PeerConnection
- snowflake SnowflakeChannel // Interface holding the WebRTC DataChannel.
+ snowflake SnowflakeChannel // Interface holding the WebRTC DataChannel.
broker *BrokerChannel
offerChannel chan *webrtc.SessionDescription
errorChannel chan error
recvPipe *io.PipeReader
writePipe *io.PipeWriter
buffer bytes.Buffer
+ reset chan struct{}
}
var webrtcRemote *webRTCConn
@@ -72,7 +77,6 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
}
func (c *webRTCConn) Write(b []byte) (int, error) {
- // log.Printf("webrtc Write %d %+q", len(b), string(b))
c.sendData(b)
return len(b), nil
}
@@ -102,10 +106,56 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
return fmt.Errorf("SetWriteDeadline not implemented")
}
+func (c *webRTCConn) PreparePeerConnection() {
+ if nil != c.pc {
+ log.Printf("PeerConnection already exists.")
+ c.pc.Close()
+ c.pc = nil
+ }
+ pc, err := webrtc.NewPeerConnection(c.config)
+ if err != nil {
+ log.Printf("NewPeerConnection: %s", err)
+ c.errorChannel <- err
+ }
+ // Prepare PeerConnection callbacks.
+ pc.OnNegotiationNeeded = func() {
+ log.Println("WebRTC: OnNegotiationNeeded")
+ go func() {
+ offer, err := pc.CreateOffer()
+ // TODO: Potentially timeout and retry if ICE isn't working.
+ if err != nil {
+ c.errorChannel <- err
+ return
+ }
+ err = pc.SetLocalDescription(offer)
+ if err != nil {
+ c.errorChannel <- err
+ return
+ }
+ }()
+ }
+ pc.OnIceCandidate = func(candidate webrtc.IceCandidate) {
+ log.Printf("OnIceCandidate %s", candidate.Serialize())
+ // Allow candidates to accumulate until OnIceComplete.
+ }
+ // TODO: This may soon be deprecated, consider OnIceGatheringStateChange.
+ pc.OnIceComplete = func() {
+ log.Printf("OnIceComplete")
+ c.offerChannel <- pc.LocalDescription()
+ }
+ // This callback is not expected, as the Client initiates the creation
+ // of the data channel, not the remote peer.
+ pc.OnDataChannel = func(channel *webrtc.DataChannel) {
+ log.Println("OnDataChannel")
+ panic("Unexpected OnDataChannel!")
+ }
+ c.pc = pc
+}
+
// Create a WebRTC DataChannel locally.
func (c *webRTCConn) EstablishDataChannel() error {
dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{})
- // Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare
+ // Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare
// an SDP offer while other goroutines operating on this struct handle the
// signaling. Eventually fires "OnOpen".
if err != nil {
@@ -126,7 +176,7 @@ func (c *webRTCConn) EstablishDataChannel() error {
// Future writes will go to the buffer until a new DataChannel is available.
log.Println("WebRTC: DataChannel.OnClose")
c.snowflake = nil
- // TODO: (Issue #12) Should attempt to renegotiate at this point.
+ c.reset <- struct{}{} // Attempt to negotiate a new datachannel..
}
dc.OnMessage = func(msg []byte) {
log.Printf("OnMessage <--- %d bytes", len(msg))
@@ -144,7 +194,7 @@ func (c *webRTCConn) EstablishDataChannel() error {
// Block until an offer is available, then send it to either
// the Broker or signal pipe.
-func (c *webRTCConn) sendOffer() error {
+func (c *webRTCConn) SendOffer() error {
select {
case offer := <-c.offerChannel:
if "" == brokerURL {
@@ -166,6 +216,7 @@ func (c *webRTCConn) sendOffer() error {
if nil == answer {
log.Printf("BrokerChannel: No answer received.")
// TODO: Should try again here.
+ c.reset <- struct{}{}
return
}
answerChannel <- answer
@@ -177,6 +228,19 @@ func (c *webRTCConn) sendOffer() error {
return nil
}
+func (c *webRTCConn) ReceiveAnswer() error {
+ log.Printf("waiting for answer...")
+ answer, ok := <-answerChannel
+ if !ok {
+ // TODO: Don't just fail, try again!
+ c.pc.Close()
+ // connection.errorChannel <- errors.New("Bad answer")
+ return errors.New("Bad answer")
+ }
+ log.Printf("Received Answer:\n\n%s\n", answer.Sdp)
+ return c.pc.SetRemoteDescription(answer)
+}
+
func (c *webRTCConn) sendData(data []byte) {
// Buffer the data in case datachannel isn't available yet.
if nil == c.snowflake {
@@ -188,72 +252,35 @@ func (c *webRTCConn) sendData(data []byte) {
c.snowflake.Send(data)
}
+// WebRTC re-establishment loop. Expected in own goroutine.
+func (c *webRTCConn) ConnectLoop() {
+ for {
+ log.Println("Establishing WebRTC connection...")
+ // TODO: When go-webrtc is more stable, it's possible that a new
+ // PeerConnection won't need to be recreated each time.
+ // called once.
+ c.PreparePeerConnection()
+ c.EstablishDataChannel()
+ c.SendOffer()
+ c.ReceiveAnswer()
+ <-c.reset
+ log.Println(" --- snowflake connection reset ---")
+ }
+}
+
// Initialize a WebRTC Connection.
func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
*webRTCConn, error) {
- pc, err := webrtc.NewPeerConnection(config)
- if err != nil {
- log.Printf("NewPeerConnection: %s", err)
- return nil, err
- }
connection := new(webRTCConn)
+ connection.config = config
connection.broker = broker
- connection.pc = pc
connection.offerChannel = make(chan *webrtc.SessionDescription)
connection.errorChannel = make(chan error)
+ connection.reset = make(chan struct{})
// Pipes remain the same even when DataChannel gets switched.
connection.recvPipe, connection.writePipe = io.Pipe()
- pc.OnNegotiationNeeded = func() {
- log.Println("OnNegotiationNeeded")
- go func() {
- offer, err := pc.CreateOffer()
- // TODO: Potentially timeout and retry if ICE isn't working.
- if err != nil {
- connection.errorChannel <- err
- return
- }
- err = pc.SetLocalDescription(offer)
- if err != nil {
- connection.errorChannel <- err
- return
- }
- }()
- }
- pc.OnIceCandidate = func(candidate webrtc.IceCandidate) {
- log.Printf("OnIceCandidate %s", candidate.Serialize())
- // Allow candidates to accumulate until OnIceComplete.
- }
- // TODO: This may soon be deprecated, consider OnIceGatheringStateChange.
- pc.OnIceComplete = func() {
- log.Printf("OnIceComplete")
- connection.offerChannel <- pc.LocalDescription()
- }
- // This callback is not expected, as the Client initiates the creation
- // of the data channel, not the remote peer.
- pc.OnDataChannel = func(channel *webrtc.DataChannel) {
- log.Println("OnDataChannel")
- panic("Unexpected OnDataChannel!")
- }
-
- connection.EstablishDataChannel()
-
- // TODO: Make this part of a re-establishment loop.
- connection.sendOffer()
- log.Printf("waiting for answer...")
- answer, ok := <-answerChannel
- if !ok {
- // TODO: Don't just fail, try again!
- pc.Close()
- return nil, fmt.Errorf("no answer received")
- }
- log.Printf("Received Answer:\n\n%s\n", answer.Sdp)
- err = pc.SetRemoteDescription(answer)
- if err != nil {
- pc.Close()
- return nil, err
- }
-
+ go connection.ConnectLoop()
return connection, nil
}
More information about the tor-commits
mailing list