[tor-commits] [snowflake/master] Make probetest wait for a datachannel to open
cohosh at torproject.org
cohosh at torproject.org
Thu Nov 5 21:49:44 UTC 2020
commit 4663599382e4db8167fcc23a1a890e24ebca517a
Author: Cecylia Bocovich <cohosh at torproject.org>
Date: Thu Nov 5 16:48:00 2020 -0500
Make probetest wait for a datachannel to open
---
probetest/probetest.go | 7 +-
proxy/snowflake.go | 205 +++++++++++++++++++++++++++++++++++++------------
2 files changed, 162 insertions(+), 50 deletions(-)
diff --git a/probetest/probetest.go b/probetest/probetest.go
index 1d2d6ef..70032da 100644
--- a/probetest/probetest.go
+++ b/probetest/probetest.go
@@ -51,7 +51,12 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription,
return nil, fmt.Errorf("accept: NewPeerConnection: %s", err)
}
pc.OnDataChannel(func(dc *webrtc.DataChannel) {
- close(dataChan)
+ dc.OnOpen(func() {
+ close(dataChan)
+ })
+ dc.OnClose(func() {
+ dc.Close()
+ })
})
err = pc.SetRemoteDescription(*sdp)
diff --git a/proxy/snowflake.go b/proxy/snowflake.go
index 276ebed..0df0d17 100644
--- a/proxy/snowflake.go
+++ b/proxy/snowflake.go
@@ -352,7 +352,7 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) {
// candidates is complete and the answer is available in LocalDescription.
// Installs an OnDataChannel callback that creates a webRTCConn and passes it to
// datachannelHandler.
-func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription,
+func makePeerConnection(offering bool, sdp *webrtc.SessionDescription,
config webrtc.Configuration,
dataChan chan struct{},
handler func(conn *webRTCConn, remoteAddr net.Addr)) (*webrtc.PeerConnection, error) {
@@ -361,67 +361,99 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription,
if err != nil {
return nil, fmt.Errorf("accept: NewPeerConnection: %s", err)
}
- pc.OnDataChannel(func(dc *webrtc.DataChannel) {
- log.Println("OnDataChannel")
- close(dataChan)
- pr, pw := io.Pipe()
- conn := &webRTCConn{pc: pc, dc: dc, pr: pr}
+ if offering {
+ offer, err := pc.CreateOffer(nil)
+ // TODO: Potentially timeout and retry if ICE isn't working.
+ if err != nil {
+ log.Println("Failed to prepare offer", err)
+ pc.Close()
+ return nil, err
+ }
+ log.Println("WebRTC: Created offer")
+ err = pc.SetLocalDescription(offer)
+ if err != nil {
+ log.Println("Failed to prepare offer", err)
+ pc.Close()
+ return nil, err
+ }
+ log.Println("WebRTC: Set local description")
+ dc, err := pc.CreateDataChannel("test", &webrtc.DataChannelInit{})
+ if err != nil {
+ log.Printf("CreateDataChannel ERROR: %s", err)
+ return nil, err
+ }
dc.OnOpen(func() {
- log.Println("OnOpen channel")
+ log.Println("WebRTC: DataChannel.OnOpen")
+ close(dataChan)
})
dc.OnClose(func() {
- conn.lock.Lock()
- defer conn.lock.Unlock()
- log.Println("OnClose channel")
- conn.dc = nil
+ log.Println("WebRTC: DataChannel.OnClose")
dc.Close()
- pw.Close()
})
- dc.OnMessage(func(msg webrtc.DataChannelMessage) {
- var n int
- n, err = pw.Write(msg.Data)
- if err != nil {
- if inerr := pw.CloseWithError(err); inerr != nil {
- log.Printf("close with error generated an error: %v", inerr)
+ } else {
+ pc.OnDataChannel(func(dc *webrtc.DataChannel) {
+ log.Println("OnDataChannel")
+ close(dataChan)
+
+ pr, pw := io.Pipe()
+ conn := &webRTCConn{pc: pc, dc: dc, pr: pr}
+
+ dc.OnOpen(func() {
+ log.Println("OnOpen channel")
+ })
+ dc.OnClose(func() {
+ conn.lock.Lock()
+ defer conn.lock.Unlock()
+ log.Println("OnClose channel")
+ conn.dc = nil
+ dc.Close()
+ pw.Close()
+ })
+ dc.OnMessage(func(msg webrtc.DataChannelMessage) {
+ var n int
+ n, err = pw.Write(msg.Data)
+ if err != nil {
+ if inerr := pw.CloseWithError(err); inerr != nil {
+ log.Printf("close with error generated an error: %v", inerr)
+ }
}
- }
- if n != len(msg.Data) {
- panic("short write")
- }
- })
-
- go handler(conn, conn.RemoteAddr())
- })
+ if n != len(msg.Data) {
+ panic("short write")
+ }
+ })
- err = pc.SetRemoteDescription(*sdp)
- if err != nil {
- if inerr := pc.Close(); inerr != nil {
- log.Printf("unable to call pc.Close after pc.SetRemoteDescription with error: %v", inerr)
+ go handler(conn, conn.RemoteAddr())
+ })
+ err = pc.SetRemoteDescription(*sdp)
+ if err != nil {
+ if inerr := pc.Close(); inerr != nil {
+ log.Printf("unable to call pc.Close after pc.SetRemoteDescription with error: %v", inerr)
+ }
+ return nil, fmt.Errorf("accept: SetRemoteDescription: %s", err)
}
- return nil, fmt.Errorf("accept: SetRemoteDescription: %s", err)
- }
- log.Println("sdp offer successfully received.")
+ log.Println("sdp offer successfully received.")
- log.Println("Generating answer...")
- answer, err := pc.CreateAnswer(nil)
- // blocks on ICE gathering. we need to add a timeout if needed
- // not putting this in a separate go routine, because we need
- // SetLocalDescription(answer) to be called before sendAnswer
- if err != nil {
- if inerr := pc.Close(); inerr != nil {
- log.Printf("ICE gathering has generated an error when calling pc.Close: %v", inerr)
+ log.Println("Generating answer...")
+ answer, err := pc.CreateAnswer(nil)
+ // blocks on ICE gathering. we need to add a timeout if needed
+ // not putting this in a separate go routine, because we need
+ // SetLocalDescription(answer) to be called before sendAnswer
+ if err != nil {
+ if inerr := pc.Close(); inerr != nil {
+ log.Printf("ICE gathering has generated an error when calling pc.Close: %v", inerr)
+ }
+ return nil, err
}
- return nil, err
- }
- err = pc.SetLocalDescription(answer)
- if err != nil {
- if err = pc.Close(); err != nil {
- log.Printf("pc.Close after setting local description returned : %v", err)
+ err = pc.SetLocalDescription(answer)
+ if err != nil {
+ if err = pc.Close(); err != nil {
+ log.Printf("pc.Close after setting local description returned : %v", err)
+ }
+ return nil, err
}
- return nil, err
}
return pc, nil
@@ -435,7 +467,7 @@ func runSession(sid string) {
return
}
dataChan := make(chan struct{})
- pc, err := makePeerConnectionFromOffer(offer, config, dataChan, datachannelHandler)
+ pc, err := makePeerConnection(false, offer, config, dataChan, datachannelHandler)
if err != nil {
log.Printf("error making WebRTC connection: %s", err)
retToken()
@@ -535,6 +567,11 @@ func main() {
updateNATType(config.ICEServers)
log.Printf("NAT type: %s", currentNATType)
+ // use probetest to determine NAT compatability
+ for {
+ testThroughput(config, "https://snowflake-broker.torproject.net:8443")
+ }
+
for {
getToken()
sessionID := genSessionID()
@@ -542,6 +579,76 @@ func main() {
}
}
+func testThroughput(config webrtc.Configuration, probeURL string) {
+
+ var err error
+
+ probe := new(SignalingServer)
+ probe.transport = http.DefaultTransport.(*http.Transport)
+ broker.transport.(*http.Transport).ResponseHeaderTimeout = 30 * time.Second
+ probe.url, err = url.Parse(probeURL)
+ if err != nil {
+ log.Printf("Error parsing url: %s", err.Error())
+ }
+ probePath := probe.url.ResolveReference(&url.URL{Path: "probe"})
+
+ // create offer
+ dataChan := make(chan struct{})
+ pc, err := makePeerConnection(true, nil, config, dataChan, func(conn *webRTCConn, addr net.Addr) { conn.Close() })
+ if err != nil {
+ log.Printf("error making WebRTC connection: %s", err)
+ return
+ }
+
+ offer := pc.LocalDescription()
+ sdp, err := util.SerializeSessionDescription(offer)
+ if err != nil {
+ log.Printf("Error encoding probe message: %s", err.Error())
+ return
+ }
+
+ // send offer
+ body, err := messages.EncodePollResponse(sdp, true, "")
+ if err != nil {
+ log.Printf("Error encoding probe message: %s", err.Error())
+ return
+ }
+ log.Println(string(body))
+ resp, err := probe.Post(probePath.String(), bytes.NewBuffer(body))
+ if err != nil {
+ log.Printf("error polling probe: %s", err.Error())
+ return
+ }
+
+ sdp, _, err = messages.DecodeAnswerRequest(resp)
+ if err != nil {
+ log.Printf("Error reading probe response: %s", err.Error())
+ return
+ }
+ answer, err := util.DeserializeSessionDescription(sdp)
+ if err != nil {
+ log.Printf("Error setting answer: %s", err.Error())
+ return
+ }
+ err = pc.SetRemoteDescription(*answer)
+ if err != nil {
+ log.Printf("Error setting answer: %s", err.Error())
+ return
+ }
+
+ log.Println("Trying to open datachannel")
+ select {
+ case <-dataChan:
+ log.Println("Connection successful.")
+ case <-time.After(dataChannelTimeout):
+ log.Println("Timed out waiting for client to open data channel.")
+ if err := pc.Close(); err != nil {
+ log.Printf("error calling pc.Close: %v", err)
+ }
+ }
+
+}
+
// use provided STUN server(s) to determine NAT type
func updateNATType(servers []webrtc.ICEServer) {
More information about the tor-commits
mailing list