[tor-commits] [snowflake/main] Implement snowflake client lib as PTv2.1 Go API
cohosh at torproject.org
cohosh at torproject.org
Wed May 12 13:11:17 UTC 2021
commit e87b9175dd7559fccd665cd7eb4b6edecc231950
Author: Cecylia Bocovich <cohosh at torproject.org>
Date: Sat Mar 20 12:36:33 2021 -0400
Implement snowflake client lib as PTv2.1 Go API
This implements a pluggable transports v2.1 compatible Go API in the
Snowflake client library, and refactors how the main Snowflake program
calls it. The Go API implements the two required client side functions:
a constructor that returns a Transport, and a Dial function for the
Transport that returns a net.Conn. See the PT specification for more
information:
https://github.com/Pluggable-Transports/Pluggable-Transports-spec/blob/master/releases/PTSpecV2.1/Pluggable%20Transport%20Specification%20v2.1%20-%20Go%20Transport%20API.pdf
---
client/client_test.go | 59 ---------------
client/lib/lib_test.go | 55 ++++++++++----
client/lib/snowflake.go | 198 +++++++++++++++++++++++++++++++++---------------
client/snowflake.go | 106 ++++++++------------------
4 files changed, 211 insertions(+), 207 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go
deleted file mode 100644
index 84e9cc1..0000000
--- a/client/client_test.go
+++ /dev/null
@@ -1,59 +0,0 @@
-package main
-
-import (
- "testing"
-
- . "github.com/smartystreets/goconvey/convey"
-)
-
-func TestICEServerParser(t *testing.T) {
- Convey("Test parsing of ICE servers", t, func() {
- for _, test := range []struct {
- input string
- urls [][]string
- length int
- }{
- {
- "",
- nil,
- 0,
- },
- {
- " ",
- nil,
- 0,
- },
- {
- "stun:stun.l.google.com:19302",
- [][]string{[]string{"stun:stun.l.google.com:19302"}},
- 1,
- },
- {
- "stun:stun.l.google.com:19302,stun.ekiga.net",
- [][]string{[]string{"stun:stun.l.google.com:19302"}, []string{"stun.ekiga.net"}},
- 2,
- },
- {
- "stun:stun.l.google.com:19302, stun.ekiga.net",
- [][]string{[]string{"stun:stun.l.google.com:19302"}, []string{"stun.ekiga.net"}},
- 2,
- },
- } {
- servers := parseIceServers(test.input)
-
- if test.urls == nil {
- So(servers, ShouldBeNil)
- } else {
- So(servers, ShouldNotBeNil)
- }
-
- So(len(servers), ShouldEqual, test.length)
-
- for _, server := range servers {
- So(test.urls, ShouldContain, server.URLs)
- }
-
- }
-
- })
-}
diff --git a/client/lib/lib_test.go b/client/lib/lib_test.go
index 5537a52..6140e0b 100644
--- a/client/lib/lib_test.go
+++ b/client/lib/lib_test.go
@@ -156,19 +156,6 @@ func TestSnowflakeClient(t *testing.T) {
})
- Convey("Snowflake", t, func() {
-
- SkipConvey("Handler Grants correctly", func() {
- socks := &FakeSocksConn{}
- broker := &BrokerChannel{Host: "test"}
- d := NewWebRTCDialer(broker, nil, 1)
-
- So(socks.rejected, ShouldEqual, false)
- Handler(socks, d)
- So(socks.rejected, ShouldEqual, true)
- })
- })
-
Convey("Dialers", t, func() {
Convey("Can construct WebRTCDialer.", func() {
broker := &BrokerChannel{Host: "test"}
@@ -267,3 +254,45 @@ func TestSnowflakeClient(t *testing.T) {
})
}
+
+func TestICEServerParser(t *testing.T) {
+ Convey("Test parsing of ICE servers", t, func() {
+ for _, test := range []struct {
+ input []string
+ urls [][]string
+ length int
+ }{
+ {
+ []string{"stun:stun.l.google.com:19302"},
+ [][]string{[]string{"stun:stun.l.google.com:19302"}},
+ 1,
+ },
+ {
+ []string{"stun:stun.l.google.com:19302", "stun.ekiga.net"},
+ [][]string{[]string{"stun:stun.l.google.com:19302"}, []string{"stun.ekiga.net"}},
+ 2,
+ },
+ {
+ []string{"stun:stun.l.google.com:19302", "stun.ekiga.net"},
+ [][]string{[]string{"stun:stun.l.google.com:19302"}, []string{"stun.ekiga.net"}},
+ 2,
+ },
+ } {
+ servers := parseIceServers(test.input)
+
+ if test.urls == nil {
+ So(servers, ShouldBeNil)
+ } else {
+ So(servers, ShouldNotBeNil)
+ }
+
+ So(len(servers), ShouldEqual, test.length)
+
+ for _, server := range servers {
+ So(test.urls, ShouldContain, server.URLs)
+ }
+
+ }
+
+ })
+}
diff --git a/client/lib/snowflake.go b/client/lib/snowflake.go
index 2ed51a1..6e87b81 100644
--- a/client/lib/snowflake.go
+++ b/client/lib/snowflake.go
@@ -3,12 +3,15 @@ package lib
import (
"context"
"errors"
- "io"
"log"
+ "math/rand"
"net"
+ "strings"
"time"
+ "git.torproject.org/pluggable-transports/snowflake.git/common/nat"
"git.torproject.org/pluggable-transports/snowflake.git/common/turbotunnel"
+ "github.com/pion/webrtc/v3"
"github.com/xtaci/kcp-go/v5"
"github.com/xtaci/smux"
)
@@ -25,6 +28,138 @@ type dummyAddr struct{}
func (addr dummyAddr) Network() string { return "dummy" }
func (addr dummyAddr) String() string { return "dummy" }
+// Transport is a structure with methods that conform to the Go PT v2.1 API
+// https://github.com/Pluggable-Transports/Pluggable-Transports-spec/blob/master/releases/PTSpecV2.1/Pluggable%20Transport%20Specification%20v2.1%20-%20Go%20Transport%20API.pdf
+type Transport struct {
+ dialer *WebRTCDialer
+}
+
+// Create a new Snowflake transport client that can spawn multiple Snowflake connections.
+// brokerURL and frontDomain are the urls for the broker host and domain fronting host
+// iceAddresses are the STUN/TURN urls needed for WebRTC negotiation
+// keepLocalAddresses is a flag to enable sending local network addresses (for testing purposes)
+// max is the maximum number of snowflakes the client should gather for each SOCKS connection
+func NewSnowflakeClient(brokerURL, frontDomain string, iceAddresses []string, keepLocalAddresses bool, max int) (*Transport, error) {
+
+ log.Println("\n\n\n --- Starting Snowflake Client ---")
+
+ iceServers := parseIceServers(iceAddresses)
+ // chooses a random subset of servers from inputs
+ rand.Seed(time.Now().UnixNano())
+ rand.Shuffle(len(iceServers), func(i, j int) {
+ iceServers[i], iceServers[j] = iceServers[j], iceServers[i]
+ })
+ if len(iceServers) > 2 {
+ iceServers = iceServers[:(len(iceServers)+1)/2]
+ }
+ log.Printf("Using ICE servers:")
+ for _, server := range iceServers {
+ log.Printf("url: %v", strings.Join(server.URLs, " "))
+ }
+
+ // Use potentially domain-fronting broker to rendezvous.
+ broker, err := NewBrokerChannel(
+ brokerURL, frontDomain, CreateBrokerTransport(),
+ keepLocalAddresses)
+ if err != nil {
+ return nil, err
+ }
+ go updateNATType(iceServers, broker)
+
+ transport := &Transport{dialer: NewWebRTCDialer(broker, iceServers, max)}
+
+ return transport, nil
+}
+
+// Create a new Snowflake connection. Starts the collection of snowflakes and returns a
+// smux Stream.
+func (t *Transport) Dial() (net.Conn, error) {
+ // Prepare to collect remote WebRTC peers.
+ snowflakes, err := NewPeers(t.dialer)
+ if err != nil {
+ return nil, err
+ }
+
+ // Use a real logger to periodically output how much traffic is happening.
+ snowflakes.BytesLogger = NewBytesSyncLogger()
+
+ log.Printf("---- SnowflakeConn: begin collecting snowflakes ---")
+ go connectLoop(snowflakes)
+
+ // Create a new smux session
+ log.Printf("---- SnowflakeConn: starting a new session ---")
+ pconn, sess, err := newSession(snowflakes)
+ if err != nil {
+ return nil, err
+ }
+
+ // On the smux session we overlay a stream.
+ stream, err := sess.OpenStream()
+ if err != nil {
+ return nil, err
+ }
+
+ // Begin exchanging data.
+ log.Printf("---- SnowflakeConn: begin stream %v ---", stream.ID())
+ return &SnowflakeConn{Stream: stream, sess: sess, pconn: pconn, snowflakes: snowflakes}, nil
+}
+
+type SnowflakeConn struct {
+ *smux.Stream
+ sess *smux.Session
+ pconn net.PacketConn
+ snowflakes *Peers
+}
+
+func (conn *SnowflakeConn) Close() error {
+ log.Printf("---- SnowflakeConn: closed stream %v ---", conn.ID())
+ conn.Stream.Close()
+ log.Printf("---- SnowflakeConn: end collecting snowflakes ---")
+ conn.snowflakes.End()
+ conn.pconn.Close()
+ log.Printf("---- SnowflakeConn: discarding finished session ---")
+ conn.sess.Close()
+ return nil //TODO: return errors if any of the above do
+}
+
+// loop through all provided STUN servers until we exhaust the list or find
+// one that is compatable with RFC 5780
+func updateNATType(servers []webrtc.ICEServer, broker *BrokerChannel) {
+
+ var restrictedNAT bool
+ var err error
+ for _, server := range servers {
+ addr := strings.TrimPrefix(server.URLs[0], "stun:")
+ restrictedNAT, err = nat.CheckIfRestrictedNAT(addr)
+ if err == nil {
+ if restrictedNAT {
+ broker.SetNATType(nat.NATRestricted)
+ } else {
+ broker.SetNATType(nat.NATUnrestricted)
+ }
+ break
+ }
+ }
+ if err != nil {
+ broker.SetNATType(nat.NATUnknown)
+ }
+}
+
+// Returns a slice of webrtc.ICEServer given a slice of addresses
+func parseIceServers(addresses []string) []webrtc.ICEServer {
+ var servers []webrtc.ICEServer
+ if len(addresses) == 0 {
+ return nil
+ }
+ for _, url := range addresses {
+ url = strings.TrimSpace(url)
+ servers = append(servers, webrtc.ICEServer{
+ URLs: []string{url},
+ })
+ }
+ return servers
+}
+
// newSession returns a new smux.Session and the net.PacketConn it is running
// over. The net.PacketConn successively connects through Snowflake proxies
// pulled from snowflakes.
@@ -94,47 +229,6 @@ func newSession(snowflakes SnowflakeCollector) (net.PacketConn, *smux.Session, e
return pconn, sess, err
}
-// Given an accepted SOCKS connection, establish a WebRTC connection to the
-// remote peer and exchange traffic.
-func Handler(socks net.Conn, tongue Tongue) error {
- // Prepare to collect remote WebRTC peers.
- snowflakes, err := NewPeers(tongue)
- if err != nil {
- return err
- }
-
- // Use a real logger to periodically output how much traffic is happening.
- snowflakes.BytesLogger = NewBytesSyncLogger()
-
- log.Printf("---- Handler: begin collecting snowflakes ---")
- go connectLoop(snowflakes)
-
- // Create a new smux session
- log.Printf("---- Handler: starting a new session ---")
- pconn, sess, err := newSession(snowflakes)
- if err != nil {
- return err
- }
-
- // On the smux session we overlay a stream.
- stream, err := sess.OpenStream()
- if err != nil {
- return err
- }
- defer stream.Close()
-
- // Begin exchanging data.
- log.Printf("---- Handler: begin stream %v ---", stream.ID())
- copyLoop(socks, stream)
- log.Printf("---- Handler: closed stream %v ---", stream.ID())
- snowflakes.End()
- log.Printf("---- Handler: end collecting snowflakes ---")
- pconn.Close()
- sess.Close()
- log.Printf("---- Handler: discarding finished session ---")
- return nil
-}
-
// Maintain |SnowflakeCapacity| number of available WebRTC connections, to
// transfer to the Tor SOCKS handler when needed.
func connectLoop(snowflakes SnowflakeCollector) {
@@ -153,23 +247,3 @@ func connectLoop(snowflakes SnowflakeCollector) {
}
}
}
-
-// Exchanges bytes between two ReadWriters.
-// (In this case, between a SOCKS connection and smux stream.)
-func copyLoop(socks, stream io.ReadWriter) {
- done := make(chan struct{}, 2)
- go func() {
- if _, err := io.Copy(socks, stream); err != nil {
- log.Printf("copying WebRTC to SOCKS resulted in error: %v", err)
- }
- done <- struct{}{}
- }()
- go func() {
- if _, err := io.Copy(stream, socks); err != nil {
- log.Printf("copying SOCKS to stream resulted in error: %v", err)
- }
- done <- struct{}{}
- }()
- <-done
- log.Println("copy loop ended")
-}
diff --git a/client/snowflake.go b/client/snowflake.go
index d79de97..f19afcf 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -6,7 +6,6 @@ import (
"io"
"io/ioutil"
"log"
- "math/rand"
"net"
"os"
"os/signal"
@@ -14,21 +13,38 @@ import (
"strings"
"sync"
"syscall"
- "time"
pt "git.torproject.org/pluggable-transports/goptlib.git"
sf "git.torproject.org/pluggable-transports/snowflake.git/client/lib"
- "git.torproject.org/pluggable-transports/snowflake.git/common/nat"
"git.torproject.org/pluggable-transports/snowflake.git/common/safelog"
- "github.com/pion/webrtc/v3"
)
const (
DefaultSnowflakeCapacity = 1
)
-// Accept local SOCKS connections and pass them to the handler.
-func socksAcceptLoop(ln *pt.SocksListener, tongue sf.Tongue, shutdown chan struct{}, wg *sync.WaitGroup) {
+// Exchanges bytes between two ReadWriters.
+// (In this case, between a SOCKS connection and a snowflake transport conn)
+func copyLoop(socks, sfconn io.ReadWriter) {
+ done := make(chan struct{}, 2)
+ go func() {
+ if _, err := io.Copy(socks, sfconn); err != nil {
+ log.Printf("copying Snowflake to SOCKS resulted in error: %v", err)
+ }
+ done <- struct{}{}
+ }()
+ go func() {
+ if _, err := io.Copy(sfconn, socks); err != nil {
+ log.Printf("copying SOCKS to Snowflake resulted in error: %v", err)
+ }
+ done <- struct{}{}
+ }()
+ <-done
+ log.Println("copy loop ended")
+}
+
+// Accept local SOCKS connections and connect to a Snowflake connection
+func socksAcceptLoop(ln *pt.SocksListener, transport *sf.Transport, shutdown chan struct{}, wg *sync.WaitGroup) {
defer ln.Close()
for {
conn, err := ln.AcceptSocks()
@@ -53,10 +69,14 @@ func socksAcceptLoop(ln *pt.SocksListener, tongue sf.Tongue, shutdown chan struc
handler := make(chan struct{})
go func() {
- err = sf.Handler(conn, tongue)
+ // pass an empty address because the broker chooses the bridge
+ sconn, err := transport.Dial()
if err != nil {
- log.Printf("handler error: %s", err)
+ log.Printf("dial error: %s", err)
}
+ // copy between the created Snowflake conn and the SOCKS conn
+ copyLoop(conn, sconn)
+ sconn.Close()
close(handler)
return
@@ -72,23 +92,6 @@ func socksAcceptLoop(ln *pt.SocksListener, tongue sf.Tongue, shutdown chan struc
}
}
-// s is a comma-separated list of ICE server URLs.
-func parseIceServers(s string) []webrtc.ICEServer {
- var servers []webrtc.ICEServer
- s = strings.TrimSpace(s)
- if len(s) == 0 {
- return nil
- }
- urls := strings.Split(s, ",")
- for _, url := range urls {
- url = strings.TrimSpace(url)
- servers = append(servers, webrtc.ICEServer{
- URLs: []string{url},
- })
- }
- return servers
-}
-
func main() {
iceServersCommas := flag.String("ice", "", "comma-separated list of ICE servers")
brokerURL := flag.String("url", "", "URL of signaling broker")
@@ -137,33 +140,13 @@ func main() {
log.SetOutput(&safelog.LogScrubber{Output: logOutput})
}
- log.Println("\n\n\n --- Starting Snowflake Client ---")
-
- iceServers := parseIceServers(*iceServersCommas)
- // chooses a random subset of servers from inputs
- rand.Seed(time.Now().UnixNano())
- rand.Shuffle(len(iceServers), func(i, j int) {
- iceServers[i], iceServers[j] = iceServers[j], iceServers[i]
- })
- if len(iceServers) > 2 {
- iceServers = iceServers[:(len(iceServers)+1)/2]
- }
- log.Printf("Using ICE servers:")
- for _, server := range iceServers {
- log.Printf("url: %v", strings.Join(server.URLs, " "))
- }
+ iceAddresses := strings.Split(strings.TrimSpace(*iceServersCommas), ",")
- // Use potentially domain-fronting broker to rendezvous.
- broker, err := sf.NewBrokerChannel(
- *brokerURL, *frontDomain, sf.CreateBrokerTransport(),
- *keepLocalAddresses || *oldKeepLocalAddresses)
+ transport, err := sf.NewSnowflakeClient(*brokerURL, *frontDomain, iceAddresses,
+ *keepLocalAddresses || *oldKeepLocalAddresses, *max)
if err != nil {
- log.Fatalf("parsing broker URL: %v", err)
+ log.Fatal("Failed to start snowflake transport: ", err)
}
- go updateNATType(iceServers, broker)
-
- // Create a new WebRTCDialer to use as the |Tongue| to catch snowflakes
- dialer := sf.NewWebRTCDialer(broker, iceServers, *max)
// Begin goptlib client process.
ptInfo, err := pt.ClientSetup(nil)
@@ -187,7 +170,7 @@ func main() {
break
}
log.Printf("Started SOCKS listener at %v.", ln.Addr())
- go socksAcceptLoop(ln, dialer, shutdown, &wg)
+ go socksAcceptLoop(ln, transport, shutdown, &wg)
pt.Cmethod(methodName, ln.Version(), ln.Addr())
listeners = append(listeners, ln)
default:
@@ -223,26 +206,3 @@ func main() {
wg.Wait()
log.Println("snowflake is done.")
}
-
-// loop through all provided STUN servers until we exhaust the list or find
-// one that is compatable with RFC 5780
-func updateNATType(servers []webrtc.ICEServer, broker *sf.BrokerChannel) {
-
- var restrictedNAT bool
- var err error
- for _, server := range servers {
- addr := strings.TrimPrefix(server.URLs[0], "stun:")
- restrictedNAT, err = nat.CheckIfRestrictedNAT(addr)
- if err == nil {
- if restrictedNAT {
- broker.SetNATType(nat.NATRestricted)
- } else {
- broker.SetNATType(nat.NATUnrestricted)
- }
- break
- }
- }
- if err != nil {
- broker.SetNATType(nat.NATUnknown)
- }
-}
More information about the tor-commits
mailing list