[tor-commits] [snowflake/master] Make probetest wait for a datachannel to open

cohosh at torproject.org cohosh at torproject.org
Thu Nov 5 21:49:44 UTC 2020


commit 4663599382e4db8167fcc23a1a890e24ebca517a
Author: Cecylia Bocovich <cohosh at torproject.org>
Date:   Thu Nov 5 16:48:00 2020 -0500

    Make probetest wait for a datachannel to open
---
 probetest/probetest.go |   7 +-
 proxy/snowflake.go     | 205 +++++++++++++++++++++++++++++++++++++------------
 2 files changed, 162 insertions(+), 50 deletions(-)

diff --git a/probetest/probetest.go b/probetest/probetest.go
index 1d2d6ef..70032da 100644
--- a/probetest/probetest.go
+++ b/probetest/probetest.go
@@ -51,7 +51,12 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription,
 		return nil, fmt.Errorf("accept: NewPeerConnection: %s", err)
 	}
 	pc.OnDataChannel(func(dc *webrtc.DataChannel) {
-		close(dataChan)
+		dc.OnOpen(func() {
+			close(dataChan)
+		})
+		dc.OnClose(func() {
+			dc.Close()
+		})
 	})
 
 	err = pc.SetRemoteDescription(*sdp)
diff --git a/proxy/snowflake.go b/proxy/snowflake.go
index 276ebed..0df0d17 100644
--- a/proxy/snowflake.go
+++ b/proxy/snowflake.go
@@ -352,7 +352,7 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) {
 // candidates is complete and the answer is available in LocalDescription.
 // Installs an OnDataChannel callback that creates a webRTCConn and passes it to
 // datachannelHandler.
-func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription,
+func makePeerConnection(offering bool, sdp *webrtc.SessionDescription,
 	config webrtc.Configuration,
 	dataChan chan struct{},
 	handler func(conn *webRTCConn, remoteAddr net.Addr)) (*webrtc.PeerConnection, error) {
@@ -361,67 +361,99 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription,
 	if err != nil {
 		return nil, fmt.Errorf("accept: NewPeerConnection: %s", err)
 	}
-	pc.OnDataChannel(func(dc *webrtc.DataChannel) {
-		log.Println("OnDataChannel")
-		close(dataChan)
 
-		pr, pw := io.Pipe()
-		conn := &webRTCConn{pc: pc, dc: dc, pr: pr}
+	if offering {
+		offer, err := pc.CreateOffer(nil)
+		// TODO: Potentially timeout and retry if ICE isn't working.
+		if err != nil {
+			log.Println("Failed to prepare offer", err)
+			pc.Close()
+			return nil, err
+		}
+		log.Println("WebRTC: Created offer")
+		err = pc.SetLocalDescription(offer)
+		if err != nil {
+			log.Println("Failed to prepare offer", err)
+			pc.Close()
+			return nil, err
+		}
+		log.Println("WebRTC: Set local description")
 
+		dc, err := pc.CreateDataChannel("test", &webrtc.DataChannelInit{})
+		if err != nil {
+			log.Printf("CreateDataChannel ERROR: %s", err)
+			return nil, err
+		}
 		dc.OnOpen(func() {
-			log.Println("OnOpen channel")
+			log.Println("WebRTC: DataChannel.OnOpen")
+			close(dataChan)
 		})
 		dc.OnClose(func() {
-			conn.lock.Lock()
-			defer conn.lock.Unlock()
-			log.Println("OnClose channel")
-			conn.dc = nil
+			log.Println("WebRTC: DataChannel.OnClose")
 			dc.Close()
-			pw.Close()
 		})
-		dc.OnMessage(func(msg webrtc.DataChannelMessage) {
-			var n int
-			n, err = pw.Write(msg.Data)
-			if err != nil {
-				if inerr := pw.CloseWithError(err); inerr != nil {
-					log.Printf("close with error generated an error: %v", inerr)
+	} else {
+		pc.OnDataChannel(func(dc *webrtc.DataChannel) {
+			log.Println("OnDataChannel")
+			close(dataChan)
+
+			pr, pw := io.Pipe()
+			conn := &webRTCConn{pc: pc, dc: dc, pr: pr}
+
+			dc.OnOpen(func() {
+				log.Println("OnOpen channel")
+			})
+			dc.OnClose(func() {
+				conn.lock.Lock()
+				defer conn.lock.Unlock()
+				log.Println("OnClose channel")
+				conn.dc = nil
+				dc.Close()
+				pw.Close()
+			})
+			dc.OnMessage(func(msg webrtc.DataChannelMessage) {
+				var n int
+				n, err = pw.Write(msg.Data)
+				if err != nil {
+					if inerr := pw.CloseWithError(err); inerr != nil {
+						log.Printf("close with error generated an error: %v", inerr)
+					}
 				}
-			}
-			if n != len(msg.Data) {
-				panic("short write")
-			}
-		})
-
-		go handler(conn, conn.RemoteAddr())
-	})
+				if n != len(msg.Data) {
+					panic("short write")
+				}
+			})
 
-	err = pc.SetRemoteDescription(*sdp)
-	if err != nil {
-		if inerr := pc.Close(); inerr != nil {
-			log.Printf("unable to call pc.Close after pc.SetRemoteDescription with error: %v", inerr)
+			go handler(conn, conn.RemoteAddr())
+		})
+		err = pc.SetRemoteDescription(*sdp)
+		if err != nil {
+			if inerr := pc.Close(); inerr != nil {
+				log.Printf("unable to call pc.Close after pc.SetRemoteDescription with error: %v", inerr)
+			}
+			return nil, fmt.Errorf("accept: SetRemoteDescription: %s", err)
 		}
-		return nil, fmt.Errorf("accept: SetRemoteDescription: %s", err)
-	}
-	log.Println("sdp offer successfully received.")
+		log.Println("sdp offer successfully received.")
 
-	log.Println("Generating answer...")
-	answer, err := pc.CreateAnswer(nil)
-	// blocks on ICE gathering. we need to add a timeout if needed
-	// not putting this in a separate go routine, because we need
-	// SetLocalDescription(answer) to be called before sendAnswer
-	if err != nil {
-		if inerr := pc.Close(); inerr != nil {
-			log.Printf("ICE gathering has generated an error when calling pc.Close: %v", inerr)
+		log.Println("Generating answer...")
+		answer, err := pc.CreateAnswer(nil)
+		// blocks on ICE gathering. we need to add a timeout if needed
+		// not putting this in a separate go routine, because we need
+		// SetLocalDescription(answer) to be called before sendAnswer
+		if err != nil {
+			if inerr := pc.Close(); inerr != nil {
+				log.Printf("ICE gathering has generated an error when calling pc.Close: %v", inerr)
+			}
+			return nil, err
 		}
-		return nil, err
-	}
 
-	err = pc.SetLocalDescription(answer)
-	if err != nil {
-		if err = pc.Close(); err != nil {
-			log.Printf("pc.Close after setting local description returned : %v", err)
+		err = pc.SetLocalDescription(answer)
+		if err != nil {
+			if err = pc.Close(); err != nil {
+				log.Printf("pc.Close after setting local description returned : %v", err)
+			}
+			return nil, err
 		}
-		return nil, err
 	}
 
 	return pc, nil
@@ -435,7 +467,7 @@ func runSession(sid string) {
 		return
 	}
 	dataChan := make(chan struct{})
-	pc, err := makePeerConnectionFromOffer(offer, config, dataChan, datachannelHandler)
+	pc, err := makePeerConnection(false, offer, config, dataChan, datachannelHandler)
 	if err != nil {
 		log.Printf("error making WebRTC connection: %s", err)
 		retToken()
@@ -535,6 +567,11 @@ func main() {
 	updateNATType(config.ICEServers)
 	log.Printf("NAT type: %s", currentNATType)
 
+	// use probetest to determine NAT compatability
+	for {
+		testThroughput(config, "https://snowflake-broker.torproject.net:8443")
+	}
+
 	for {
 		getToken()
 		sessionID := genSessionID()
@@ -542,6 +579,76 @@ func main() {
 	}
 }
 
+func testThroughput(config webrtc.Configuration, probeURL string) {
+
+	var err error
+
+	probe := new(SignalingServer)
+	probe.transport = http.DefaultTransport.(*http.Transport)
+	broker.transport.(*http.Transport).ResponseHeaderTimeout = 30 * time.Second
+	probe.url, err = url.Parse(probeURL)
+	if err != nil {
+		log.Printf("Error parsing url: %s", err.Error())
+	}
+	probePath := probe.url.ResolveReference(&url.URL{Path: "probe"})
+
+	// create offer
+	dataChan := make(chan struct{})
+	pc, err := makePeerConnection(true, nil, config, dataChan, func(conn *webRTCConn, addr net.Addr) { conn.Close() })
+	if err != nil {
+		log.Printf("error making WebRTC connection: %s", err)
+		return
+	}
+
+	offer := pc.LocalDescription()
+	sdp, err := util.SerializeSessionDescription(offer)
+	if err != nil {
+		log.Printf("Error encoding probe message: %s", err.Error())
+		return
+	}
+
+	// send offer
+	body, err := messages.EncodePollResponse(sdp, true, "")
+	if err != nil {
+		log.Printf("Error encoding probe message: %s", err.Error())
+		return
+	}
+	log.Println(string(body))
+	resp, err := probe.Post(probePath.String(), bytes.NewBuffer(body))
+	if err != nil {
+		log.Printf("error polling probe: %s", err.Error())
+		return
+	}
+
+	sdp, _, err = messages.DecodeAnswerRequest(resp)
+	if err != nil {
+		log.Printf("Error reading probe response: %s", err.Error())
+		return
+	}
+	answer, err := util.DeserializeSessionDescription(sdp)
+	if err != nil {
+		log.Printf("Error setting answer: %s", err.Error())
+		return
+	}
+	err = pc.SetRemoteDescription(*answer)
+	if err != nil {
+		log.Printf("Error setting answer: %s", err.Error())
+		return
+	}
+
+	log.Println("Trying to open datachannel")
+	select {
+	case <-dataChan:
+		log.Println("Connection successful.")
+	case <-time.After(dataChannelTimeout):
+		log.Println("Timed out waiting for client to open data channel.")
+		if err := pc.Close(); err != nil {
+			log.Printf("error calling pc.Close: %v", err)
+		}
+	}
+
+}
+
 // use provided STUN server(s) to determine NAT type
 func updateNATType(servers []webrtc.ICEServer) {
 



More information about the tor-commits mailing list