[tor-commits] [snowflake/master] Wait for data channel OnOpen before returning from NewWebRTCPeer.
dcf at torproject.org
dcf at torproject.org
Tue Apr 28 03:12:49 UTC 2020
commit 047d3214bfb46de07e5d9f223e4fb1ba24584c8a
Author: David Fifield <david at bamsoftware.com>
Date: Fri Apr 24 13:30:13 2020 -0600
Wait for data channel OnOpen before returning from NewWebRTCPeer.
Now callers cannot call Write without there being a DataChannel to write
to. This lets us remove the internal buffer and checks for transport ==
nil.
Don't set internal fields like writePipe, transport, and pc to nil when
closing; just close them and let them return errors if further calls are
made on them.
There's now a constant DataChannelTimeout that's separate from
SnowflakeTimeout (the latter is what checkForStaleness uses). Now we can
set DataChannel timeout to a lower value, to quickly dispose of
unconnectable proxies, while still keeping the threshold for detecting
the failure of a once-working proxy at 30 seconds.
https://bugs.torproject.org/33897
---
client/lib/snowflake.go | 2 ++
client/lib/webrtc.go | 96 +++++++++++++------------------------------------
2 files changed, 26 insertions(+), 72 deletions(-)
diff --git a/client/lib/snowflake.go b/client/lib/snowflake.go
index 27991b2..0076b79 100644
--- a/client/lib/snowflake.go
+++ b/client/lib/snowflake.go
@@ -17,6 +17,8 @@ import (
const (
ReconnectTimeout = 10 * time.Second
SnowflakeTimeout = 30 * time.Second
+ // How long to wait for the OnOpen callback on a DataChannel.
+ DataChannelTimeout = 30 * time.Second
)
type dummyAddr struct{}
diff --git a/client/lib/webrtc.go b/client/lib/webrtc.go
index b4c0aad..edc8ab4 100644
--- a/client/lib/webrtc.go
+++ b/client/lib/webrtc.go
@@ -1,7 +1,6 @@
package lib
import (
- "bytes"
"crypto/rand"
"encoding/hex"
"errors"
@@ -25,12 +24,10 @@ type WebRTCPeer struct {
recvPipe *io.PipeReader
writePipe *io.PipeWriter
lastReceive time.Time
- buffer bytes.Buffer
closed bool
- lock sync.Mutex // Synchronization for DataChannel destruction
- once sync.Once // Synchronization for PeerConnection destruction
+ once sync.Once // Synchronization for PeerConnection destruction
BytesLogger BytesLogger
}
@@ -70,16 +67,11 @@ func (c *WebRTCPeer) Read(b []byte) (int, error) {
// Writes bytes out to remote WebRTC.
// As part of |io.ReadWriter|
func (c *WebRTCPeer) Write(b []byte) (int, error) {
- c.lock.Lock()
- defer c.lock.Unlock()
- c.BytesLogger.AddOutbound(len(b))
- // TODO: Buffering could be improved / separated out of WebRTCPeer.
- if nil == c.transport {
- log.Printf("Buffered %d bytes --> WebRTC", len(b))
- c.buffer.Write(b)
- } else {
- c.transport.Send(b)
+ err := c.transport.Send(b)
+ if err != nil {
+ return 0, err
}
+ c.BytesLogger.AddOutbound(len(b))
return len(b), nil
}
@@ -127,8 +119,9 @@ func (c *WebRTCPeer) connect(config *webrtc.Configuration, broker *BrokerChannel
log.Println("WebRTC: Unable to SetRemoteDescription:", err)
return err
}
- err = c.establishDataChannel()
+ c.transport, err = c.establishDataChannel()
if err != nil {
+ log.Printf("establishDataChannel: %v", err)
// nolint: golint
return errors.New("WebRTC: Could not establish DataChannel")
}
@@ -177,13 +170,9 @@ func preparePeerConnection(config *webrtc.Configuration) (*webrtc.PeerConnection
return pc, nil
}
-// Create a WebRTC DataChannel locally.
-func (c *WebRTCPeer) establishDataChannel() error {
- c.lock.Lock()
- defer c.lock.Unlock()
- if c.transport != nil {
- panic("Unexpected datachannel already exists!")
- }
+// Create a WebRTC DataChannel locally. Blocks until the data channel is open,
+// or a timeout or error occurs.
+func (c *WebRTCPeer) establishDataChannel() (*webrtc.DataChannel, error) {
ordered := true
dataChannelOptions := &webrtc.DataChannelInit{
Ordered: &ordered,
@@ -191,41 +180,15 @@ func (c *WebRTCPeer) establishDataChannel() error {
dc, err := c.pc.CreateDataChannel(c.id, dataChannelOptions)
if err != nil {
log.Printf("CreateDataChannel ERROR: %s", err)
- return err
+ return nil, err
}
+ openChannel := make(chan struct{})
dc.OnOpen(func() {
- c.lock.Lock()
- defer c.lock.Unlock()
log.Println("WebRTC: DataChannel.OnOpen")
- if nil != c.transport {
- panic("WebRTC: transport already exists.")
- }
- // Flush buffered outgoing SOCKS data if necessary.
- if c.buffer.Len() > 0 {
- dc.Send(c.buffer.Bytes())
- log.Println("Flushed", c.buffer.Len(), "bytes.")
- c.buffer.Reset()
- }
- // Then enable the datachannel.
- c.transport = dc
+ close(openChannel)
})
dc.OnClose(func() {
- c.lock.Lock()
- // Future writes will go to the buffer until a new DataChannel is available.
- if nil == c.transport {
- // Closed locally, as part of a reset.
- log.Println("WebRTC: DataChannel.OnClose [locally]")
- c.lock.Unlock()
- return
- }
- // Closed remotely, need to reset everything.
- // Disable the DataChannel as a write destination.
- log.Println("WebRTC: DataChannel.OnClose [remotely]")
- c.transport = nil
- dc.Close()
- // Unlock before Close'ing, since it calls cleanup and asks for the
- // lock to check if the transport needs to be be deleted.
- c.lock.Unlock()
+ log.Println("WebRTC: DataChannel.OnClose")
c.Close()
})
dc.OnMessage(func(msg webrtc.DataChannelMessage) {
@@ -244,7 +207,14 @@ func (c *WebRTCPeer) establishDataChannel() error {
c.lastReceive = time.Now()
})
log.Println("WebRTC: DataChannel created.")
- return nil
+
+ select {
+ case <-openChannel:
+ return dc, nil
+ case <-time.After(DataChannelTimeout):
+ dc.Close()
+ return nil, errors.New("timeout waiting for DataChannel.OnOpen")
+ }
}
// exchangeSDP sends the local SDP offer to the Broker, awaits the SDP answer,
@@ -266,27 +236,10 @@ func exchangeSDP(broker *BrokerChannel, offer *webrtc.SessionDescription) *webrt
// Close all channels and transports
func (c *WebRTCPeer) cleanup() {
// Close this side of the SOCKS pipe.
- if nil != c.writePipe {
- c.writePipe.Close()
- c.writePipe = nil
- }
- c.lock.Lock()
+ c.writePipe.Close()
if nil != c.transport {
log.Printf("WebRTC: closing DataChannel")
- dataChannel := c.transport
- // Setting transport to nil *before* dc Close indicates to OnClose that
- // this was locally triggered.
- c.transport = nil
- // Release the lock before calling DeleteDataChannel (which in turn
- // calls Close on the dataChannel), but after nil'ing out the transport,
- // since otherwise we'll end up in the onClose handler in a deadlock.
- c.lock.Unlock()
- if c.pc == nil {
- panic("DataChannel w/o PeerConnection, not good.")
- }
- dataChannel.Close()
- } else {
- c.lock.Unlock()
+ c.transport.Close()
}
if nil != c.pc {
log.Printf("WebRTC: closing PeerConnection")
@@ -294,6 +247,5 @@ func (c *WebRTCPeer) cleanup() {
if nil != err {
log.Printf("Error closing peerconnection...")
}
- c.pc = nil
}
}
More information about the tor-commits
mailing list