[tor-commits] [snowflake/master] Update broker--proxy protocol with proxy type

cohosh at torproject.org cohosh at torproject.org
Thu Nov 28 18:54:16 UTC 2019


commit 7277bb37cd8a96afd8516870cc286b3845fa48bb
Author: Cecylia Bocovich <cohosh at torproject.org>
Date:   Wed Nov 20 12:41:53 2019 -0500

    Update broker--proxy protocol with proxy type
    
    Proxies now include information about what type they are when they poll
    for client offers. The broker saves this information along with
    snowflake ids and outputs it on the /debug page.
---
 broker/broker.go                | 26 +++++++++++++++++--------
 broker/snowflake-broker_test.go | 14 +++++++-------
 broker/snowflake-heap.go        |  1 +
 common/messages/proxy.go        | 42 ++++++++++++++++++++++++-----------------
 common/messages/proxy_test.go   | 28 +++++++++++++++++++++------
 proxy-go/snowflake.go           |  2 +-
 6 files changed, 74 insertions(+), 39 deletions(-)

diff --git a/broker/broker.go b/broker/broker.go
index 13d2575..3edfe84 100644
--- a/broker/broker.go
+++ b/broker/broker.go
@@ -97,14 +97,16 @@ func (mh MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 // Proxies may poll for client offers concurrently.
 type ProxyPoll struct {
 	id           string
+	ptype        string
 	offerChannel chan []byte
 }
 
 // Registers a Snowflake and waits for some Client to send an offer,
 // as part of the polling logic of the proxy handler.
-func (ctx *BrokerContext) RequestOffer(id string) []byte {
+func (ctx *BrokerContext) RequestOffer(id string, ptype string) []byte {
 	request := new(ProxyPoll)
 	request.id = id
+	request.ptype = ptype
 	request.offerChannel = make(chan []byte)
 	ctx.proxyPolls <- request
 	// Block until an offer is available, or timeout which sends a nil offer.
@@ -117,7 +119,7 @@ func (ctx *BrokerContext) RequestOffer(id string) []byte {
 // client offer or nil on timeout / none are available.
 func (ctx *BrokerContext) Broker() {
 	for request := range ctx.proxyPolls {
-		snowflake := ctx.AddSnowflake(request.id)
+		snowflake := ctx.AddSnowflake(request.id, request.ptype)
 		// Wait for a client to avail an offer to the snowflake.
 		go func(request *ProxyPoll) {
 			select {
@@ -137,10 +139,11 @@ func (ctx *BrokerContext) Broker() {
 // Create and add a Snowflake to the heap.
 // Required to keep track of proxies between providing them
 // with an offer and awaiting their second POST with an answer.
-func (ctx *BrokerContext) AddSnowflake(id string) *Snowflake {
+func (ctx *BrokerContext) AddSnowflake(id string, ptype string) *Snowflake {
 	snowflake := new(Snowflake)
 	snowflake.id = id
 	snowflake.clients = 0
+	snowflake.ptype = ptype
 	snowflake.offerChannel = make(chan []byte)
 	snowflake.answerChannel = make(chan []byte)
 	heap.Push(ctx.snowflakes, snowflake)
@@ -159,7 +162,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
-	sid, err := messages.DecodePollRequest(body)
+	sid, ptype, err := messages.DecodePollRequest(body)
 	if err != nil {
 		w.WriteHeader(http.StatusBadRequest)
 		return
@@ -174,7 +177,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
 	}
 
 	// Wait for a client to avail an offer to the snowflake, or timeout if nil.
-	offer := ctx.RequestOffer(sid)
+	offer := ctx.RequestOffer(sid, ptype)
 	var b []byte
 	if nil == offer {
 		ctx.metrics.proxyIdleCount++
@@ -286,16 +289,23 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
 func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
 	s := fmt.Sprintf("current snowflakes available: %d\n", ctx.snowflakes.Len())
 
-	var browsers, standalones int
+	var webexts, browsers, standalones, unknowns int
 	for _, snowflake := range ctx.idToSnowflake {
-		if len(snowflake.id) < 16 {
+		if snowflake.ptype == "badge" {
 			browsers++
-		} else {
+		} else if snowflake.ptype == "webext" {
+			webexts++
+		} else if snowflake.ptype == "standalone" {
 			standalones++
+		} else {
+			unknowns++
 		}
+
 	}
 	s += fmt.Sprintf("\tstandalone proxies: %d", standalones)
 	s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers)
+	s += fmt.Sprintf("\n\twebext proxies: %d", webexts)
+	s += fmt.Sprintf("\n\tunknown proxies: %d", unknowns)
 	if _, err := w.Write([]byte(s)); err != nil {
 		log.Printf("writing proxy information returned error: %v ", err)
 	}
diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go
index c35c1d6..cb5f34f 100644
--- a/broker/snowflake-broker_test.go
+++ b/broker/snowflake-broker_test.go
@@ -29,7 +29,7 @@ func TestBroker(t *testing.T) {
 		Convey("Adds Snowflake", func() {
 			So(ctx.snowflakes.Len(), ShouldEqual, 0)
 			So(len(ctx.idToSnowflake), ShouldEqual, 0)
-			ctx.AddSnowflake("foo")
+			ctx.AddSnowflake("foo", "")
 			So(ctx.snowflakes.Len(), ShouldEqual, 1)
 			So(len(ctx.idToSnowflake), ShouldEqual, 1)
 		})
@@ -55,7 +55,7 @@ func TestBroker(t *testing.T) {
 		Convey("Request an offer from the Snowflake Heap", func() {
 			done := make(chan []byte)
 			go func() {
-				offer := ctx.RequestOffer("test")
+				offer := ctx.RequestOffer("test", "")
 				done <- offer
 			}()
 			request := <-ctx.proxyPolls
@@ -79,7 +79,7 @@ func TestBroker(t *testing.T) {
 			Convey("with a proxy answer if available.", func() {
 				done := make(chan bool)
 				// Prepare a fake proxy to respond with.
-				snowflake := ctx.AddSnowflake("fake")
+				snowflake := ctx.AddSnowflake("fake", "")
 				go func() {
 					clientOffers(ctx, w, r)
 					done <- true
@@ -97,7 +97,7 @@ func TestBroker(t *testing.T) {
 					return
 				}
 				done := make(chan bool)
-				snowflake := ctx.AddSnowflake("fake")
+				snowflake := ctx.AddSnowflake("fake", "")
 				go func() {
 					clientOffers(ctx, w, r)
 					// Takes a few seconds here...
@@ -147,7 +147,7 @@ func TestBroker(t *testing.T) {
 		})
 
 		Convey("Responds to proxy answers...", func() {
-			s := ctx.AddSnowflake("test")
+			s := ctx.AddSnowflake("test", "")
 			w := httptest.NewRecorder()
 			data := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"test","Answer":"test"}`))
 
@@ -211,7 +211,7 @@ func TestBroker(t *testing.T) {
 		// Manually do the Broker goroutine action here for full control.
 		p := <-ctx.proxyPolls
 		So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp")
-		s := ctx.AddSnowflake(p.id)
+		s := ctx.AddSnowflake(p.id, "")
 		go func() {
 			offer := <-s.offerChannel
 			p.offerChannel <- offer
@@ -449,7 +449,7 @@ func TestMetrics(t *testing.T) {
 			So(err, ShouldBeNil)
 
 			// Prepare a fake proxy to respond with.
-			snowflake := ctx.AddSnowflake("fake")
+			snowflake := ctx.AddSnowflake("fake", "")
 			go func() {
 				clientOffers(ctx, w, r)
 				done <- true
diff --git a/broker/snowflake-heap.go b/broker/snowflake-heap.go
index 419956f..cf209ec 100644
--- a/broker/snowflake-heap.go
+++ b/broker/snowflake-heap.go
@@ -10,6 +10,7 @@ over the offer and answer channels.
 */
 type Snowflake struct {
 	id            string
+	ptype         string
 	offerChannel  chan []byte
 	answerChannel chan []byte
 	clients       int
diff --git a/common/messages/proxy.go b/common/messages/proxy.go
index 042caf9..7ebab1d 100644
--- a/common/messages/proxy.go
+++ b/common/messages/proxy.go
@@ -6,16 +6,18 @@ package messages
 import (
 	"encoding/json"
 	"fmt"
+	"strings"
 )
 
-const version = "1.0"
+const version = "1.1"
 
-/* Version 1.0 specification:
+/* Version 1.1 specification:
 
 == ProxyPollRequest ==
 {
-  Sid: [generated session id of proxy]
-  Version: 1.0
+  Sid: [generated session id of proxy],
+  Version: 1.1,
+  Type: [badge|webext|standalone]
 }
 
 == ProxyPollResponse ==
@@ -41,11 +43,11 @@ HTTP 400 BadRequest
 
 == ProxyAnswerRequest ==
 {
-  Sid: [generated session id of proxy]
-  Version: 1.0
+  Sid: [generated session id of proxy],
+  Version: 1.1,
   Answer:
   {
-    type: answer
+    type: answer,
     sdp: [WebRTC SDP]
   }
 }
@@ -73,34 +75,38 @@ HTTP 400 BadRequest
 type ProxyPollRequest struct {
 	Sid     string
 	Version string
+	Type    string
 }
 
-func EncodePollRequest(sid string) ([]byte, error) {
+func EncodePollRequest(sid string, ptype string) ([]byte, error) {
 	return json.Marshal(ProxyPollRequest{
 		Sid:     sid,
 		Version: version,
+		Type:    ptype,
 	})
 }
 
 // Decodes a poll message from a snowflake proxy and returns the
 // sid of the proxy on success and an error if it failed
-func DecodePollRequest(data []byte) (string, error) {
+func DecodePollRequest(data []byte) (string, string, error) {
 	var message ProxyPollRequest
 
 	err := json.Unmarshal(data, &message)
 	if err != nil {
-		return "", err
+		return "", "", err
 	}
-	if message.Version != "1.0" {
-		return "", fmt.Errorf("using unknown version")
+
+	majorVersion := strings.Split(message.Version, ".")[0]
+	if majorVersion != "1" {
+		return "", "", fmt.Errorf("using unknown version")
 	}
 
-	// Version 1.0 requires an Sid
+	// Version 1.x requires an Sid
 	if message.Sid == "" {
-		return "", fmt.Errorf("no supplied session id")
+		return "", "", fmt.Errorf("no supplied session id")
 	}
 
-	return message.Sid, nil
+	return message.Sid, message.Type, nil
 }
 
 type ProxyPollResponse struct {
@@ -153,7 +159,7 @@ type ProxyAnswerRequest struct {
 
 func EncodeAnswerRequest(answer string, sid string) ([]byte, error) {
 	return json.Marshal(ProxyAnswerRequest{
-		Version: "1.0",
+		Version: "1.1",
 		Sid:     sid,
 		Answer:  answer,
 	})
@@ -167,7 +173,9 @@ func DecodeAnswerRequest(data []byte) (string, string, error) {
 	if err != nil {
 		return "", "", err
 	}
-	if message.Version != "1.0" {
+
+	majorVersion := strings.Split(message.Version, ".")[0]
+	if majorVersion != "1" {
 		return "", "", fmt.Errorf("using unknown version")
 	}
 
diff --git a/common/messages/proxy_test.go b/common/messages/proxy_test.go
index f2f006e..83553a0 100644
--- a/common/messages/proxy_test.go
+++ b/common/messages/proxy_test.go
@@ -11,45 +11,60 @@ import (
 func TestDecodeProxyPollRequest(t *testing.T) {
 	Convey("Context", t, func() {
 		for _, test := range []struct {
-			sid  string
-			data string
-			err  error
+			sid   string
+			ptype string
+			data  string
+			err   error
 		}{
 			{
 				//Version 1.0 proxy message
 				"ymbcCMto7KHNGYlp",
+				"",
 				`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`,
 				nil,
 			},
 			{
+				//Version 1.1 proxy message
+				"ymbcCMto7KHNGYlp",
+				"standalone",
+				`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.1","Type":"standalone"}`,
+				nil,
+			},
+			{
 				//Version 0.X proxy message:
 				"",
+				"",
 				"ymbcCMto7KHNGYlp",
 				&json.SyntaxError{},
 			},
 			{
 				"",
+				"",
 				`{"Sid":"ymbcCMto7KHNGYlp"}`,
 				fmt.Errorf(""),
 			},
 			{
 				"",
+				"",
 				"{}",
 				fmt.Errorf(""),
 			},
 			{
 				"",
+				"",
 				`{"Version":"1.0"}`,
 				fmt.Errorf(""),
 			},
 			{
 				"",
+				"",
 				`{"Version":"2.0"}`,
 				fmt.Errorf(""),
 			},
 		} {
-			sid, err := DecodePollRequest([]byte(test.data))
+			sid, ptype, err := DecodePollRequest([]byte(test.data))
 			So(sid, ShouldResemble, test.sid)
+			So(ptype, ShouldResemble, test.ptype)
 			So(err, ShouldHaveSameTypeAs, test.err)
 		}
 
@@ -58,10 +73,11 @@ func TestDecodeProxyPollRequest(t *testing.T) {
 
 func TestEncodeProxyPollRequests(t *testing.T) {
 	Convey("Context", t, func() {
-		b, err := EncodePollRequest("ymbcCMto7KHNGYlp")
+		b, err := EncodePollRequest("ymbcCMto7KHNGYlp", "standalone")
 		So(err, ShouldEqual, nil)
-		sid, err := DecodePollRequest(b)
+		sid, ptype, err := DecodePollRequest(b)
 		So(sid, ShouldEqual, "ymbcCMto7KHNGYlp")
+		So(ptype, ShouldEqual, "standalone")
 		So(err, ShouldEqual, nil)
 	})
 }
diff --git a/proxy-go/snowflake.go b/proxy-go/snowflake.go
index c10093a..dce7b70 100644
--- a/proxy-go/snowflake.go
+++ b/proxy-go/snowflake.go
@@ -175,7 +175,7 @@ func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription {
 			timeOfNextPoll = now
 		}
 
-		body, err := messages.EncodePollRequest(sid)
+		body, err := messages.EncodePollRequest(sid, "standalone")
 		if err != nil {
 			log.Printf("Error encoding poll message: %s", err.Error())
 			return nil





More information about the tor-commits mailing list