[tor-commits] [snowflake/main] Broker /amp/client route (AMP cache client registration).

dcf at torproject.org dcf at torproject.org
Thu Aug 5 22:18:28 UTC 2021


commit e833119befa052e4837fe147f8bc2766a4ca7c54
Author: David Fifield <david at bamsoftware.com>
Date:   Sun Jul 18 23:37:41 2021 -0600

    Broker /amp/client route (AMP cache client registration).
---
 broker/amp.go                   | 76 +++++++++++++++++++++++++++++++++++++++
 broker/broker.go                |  2 ++
 broker/snowflake-broker_test.go | 78 +++++++++++++++++++++++++++++++++++++++--
 3 files changed, 154 insertions(+), 2 deletions(-)

diff --git a/broker/amp.go b/broker/amp.go
new file mode 100644
index 0000000..8641e51
--- /dev/null
+++ b/broker/amp.go
@@ -0,0 +1,76 @@
+package main
+
+import (
+	"log"
+	"net/http"
+	"strings"
+
+	"git.torproject.org/pluggable-transports/snowflake.git/common/amp"
+	"git.torproject.org/pluggable-transports/snowflake.git/common/messages"
+)
+
+// ampClientOffers is the AMP-speaking endpoint for client poll messages,
+// intended for access via an AMP cache. In contrast to the other clientOffers,
+// the client's encoded poll message is stored in the URL path rather than the
+// HTTP request body (because an AMP cache does not support POST), and the
+// encoded client poll response is sent back as AMP-armored HTML.
+func ampClientOffers(i *IPC, w http.ResponseWriter, r *http.Request) {
+	// The encoded client poll message immediately follows the /amp/client/
+	// path prefix, so this function unfortunately needs to be aware of and
+	// remote its own routing prefix.
+	path := strings.TrimPrefix(r.URL.Path, "/amp/client/")
+	if path == r.URL.Path {
+		// The path didn't start with the expected prefix. This probably
+		// indicates an internal bug.
+		log.Println("ampClientOffers: unexpected prefix in path")
+		w.WriteHeader(http.StatusInternalServerError)
+		return
+	}
+
+	var encPollReq []byte
+	var response []byte
+	var err error
+
+	encPollReq, err = amp.DecodePath(path)
+	if err == nil {
+		arg := messages.Arg{
+			Body:       encPollReq,
+			RemoteAddr: "",
+		}
+		err = i.ClientOffers(arg, &response)
+	} else {
+		response, err = (&messages.ClientPollResponse{
+			Error: "cannot decode URL path",
+		}).EncodePollResponse()
+	}
+
+	if err != nil {
+		// We couldn't even construct a JSON object containing an error
+		// message :( Nothing to do but signal an error at the HTTP
+		// layer. The AMP cache will translate this 500 status into a
+		// 404 status.
+		// https://amp.dev/documentation/guides-and-tutorials/learn/amp-caches-and-cors/amp-cache-urls/#redirect-%26-error-handling
+		log.Printf("ampClientOffers: %v", err)
+		w.WriteHeader(http.StatusInternalServerError)
+		return
+	}
+
+	w.Header().Set("Content-Type", "text/html")
+	// Attempt to hint to an AMP cache not to waste resources caching this
+	// document. "The Google AMP Cache considers any document fresh for at
+	// least 15 seconds."
+	// https://developers.google.com/amp/cache/overview#google-amp-cache-updates
+	w.Header().Set("Cache-Control", "max-age=15")
+	w.WriteHeader(http.StatusOK)
+
+	enc, err := amp.NewArmorEncoder(w)
+	if err != nil {
+		log.Printf("amp.NewArmorEncoder: %v", err)
+		return
+	}
+	defer enc.Close()
+
+	if _, err := enc.Write(response); err != nil {
+		log.Printf("ampClientOffers: unable to write answer: %v", err)
+	}
+}
diff --git a/broker/broker.go b/broker/broker.go
index 437a4d1..6c855f3 100644
--- a/broker/broker.go
+++ b/broker/broker.go
@@ -218,6 +218,8 @@ func main() {
 	http.Handle("/metrics", MetricsHandler{metricsFilename, metricsHandler})
 	http.Handle("/prometheus", promhttp.HandlerFor(ctx.metrics.promMetrics.registry, promhttp.HandlerOpts{}))
 
+	http.Handle("/amp/client/", SnowflakeHandler{i, ampClientOffers})
+
 	server := http.Server{
 		Addr: addr,
 	}
diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go
index 9e1c9f1..233cfea 100644
--- a/broker/snowflake-broker_test.go
+++ b/broker/snowflake-broker_test.go
@@ -3,6 +3,7 @@ package main
 import (
 	"bytes"
 	"container/heap"
+	"io"
 	"io/ioutil"
 	"log"
 	"net"
@@ -13,6 +14,7 @@ import (
 	"testing"
 	"time"
 
+	"git.torproject.org/pluggable-transports/snowflake.git/common/amp"
 	. "github.com/smartystreets/goconvey/convey"
 )
 
@@ -24,6 +26,15 @@ func NullLogger() *log.Logger {
 
 var promOnce sync.Once
 
+func decodeAMPArmorToString(r io.Reader) (string, error) {
+	dec, err := amp.NewArmorDecoder(r)
+	if err != nil {
+		return "", err
+	}
+	p, err := ioutil.ReadAll(dec)
+	return string(p), err
+}
+
 func TestBroker(t *testing.T) {
 
 	Convey("Context", t, func() {
@@ -69,7 +80,7 @@ func TestBroker(t *testing.T) {
 			So(offer.sdp, ShouldResemble, []byte("test offer"))
 		})
 
-		Convey("Responds to client offers...", func() {
+		Convey("Responds to HTTP client offers...", func() {
 			w := httptest.NewRecorder()
 			data := bytes.NewReader(
 				[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}"))
@@ -117,7 +128,7 @@ func TestBroker(t *testing.T) {
 			})
 		})
 
-		Convey("Responds to legacy client offers...", func() {
+		Convey("Responds to HTTP legacy client offers...", func() {
 			w := httptest.NewRecorder()
 			data := bytes.NewReader([]byte("{test}"))
 			r, err := http.NewRequest("POST", "snowflake.broker/client", data)
@@ -165,6 +176,69 @@ func TestBroker(t *testing.T) {
 
 		})
 
+		Convey("Responds to AMP client offers...", func() {
+			w := httptest.NewRecorder()
+			encPollReq := []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}")
+			r, err := http.NewRequest("GET", "/amp/client/"+amp.EncodePath(encPollReq), nil)
+			So(err, ShouldBeNil)
+
+			Convey("with status 200 when request is badly formatted.", func() {
+				r, err := http.NewRequest("GET", "/amp/client/bad", nil)
+				So(err, ShouldBeNil)
+				ampClientOffers(i, w, r)
+				body, err := decodeAMPArmorToString(w.Body)
+				So(err, ShouldBeNil)
+				So(body, ShouldEqual, `{"error":"cannot decode URL path"}`)
+			})
+
+			Convey("with error when no snowflakes are available.", func() {
+				ampClientOffers(i, w, r)
+				So(w.Code, ShouldEqual, http.StatusOK)
+				body, err := decodeAMPArmorToString(w.Body)
+				So(err, ShouldBeNil)
+				So(body, ShouldEqual, `{"error":"no snowflake proxies currently available"}`)
+			})
+
+			Convey("with a proxy answer if available.", func() {
+				done := make(chan bool)
+				// Prepare a fake proxy to respond with.
+				snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted, 0)
+				go func() {
+					ampClientOffers(i, w, r)
+					done <- true
+				}()
+				offer := <-snowflake.offerChannel
+				So(offer.sdp, ShouldResemble, []byte("fake"))
+				snowflake.answerChannel <- "fake answer"
+				<-done
+				body, err := decodeAMPArmorToString(w.Body)
+				So(err, ShouldBeNil)
+				So(body, ShouldEqual, `{"answer":"fake answer"}`)
+				So(w.Code, ShouldEqual, http.StatusOK)
+			})
+
+			Convey("Times out when no proxy responds.", func() {
+				if testing.Short() {
+					return
+				}
+				done := make(chan bool)
+				snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted, 0)
+				go func() {
+					ampClientOffers(i, w, r)
+					// Takes a few seconds here...
+					done <- true
+				}()
+				offer := <-snowflake.offerChannel
+				So(offer.sdp, ShouldResemble, []byte("fake"))
+				<-done
+				So(w.Code, ShouldEqual, http.StatusOK)
+				body, err := decodeAMPArmorToString(w.Body)
+				So(err, ShouldBeNil)
+				So(body, ShouldEqual, `{"error":"timed out waiting for answer!"}`)
+			})
+
+		})
+
 		Convey("Responds to proxy polls...", func() {
 			done := make(chan bool)
 			w := httptest.NewRecorder()





More information about the tor-commits mailing list