[tor-commits] [snowflake/master] prepare snowflake client for buffered datachannel writes, separate out dialWebRTC (#12)
serene at torproject.org
serene at torproject.org
Thu Feb 18 22:15:38 UTC 2016
commit 760dee8a0f3efc1a71780bfde277ae7a5a7a6d9b
Author: Serene Han <keroserene+git at gmail.com>
Date: Wed Feb 17 17:39:09 2016 -0800
prepare snowflake client for buffered datachannel writes, separate out dialWebRTC (#12)
---
client/snowflake.go | 196 ++++++++++++++++++++++++++++++----------------------
1 file changed, 115 insertions(+), 81 deletions(-)
diff --git a/client/snowflake.go b/client/snowflake.go
index ba1561e..7c47fbb 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -28,7 +28,7 @@ var frontDomain string
// ends, -1 is written.
var handlerChan = make(chan int)
-var signalChan = make(chan *webrtc.SessionDescription)
+var answerChannel = make(chan *webrtc.SessionDescription)
func copyLoop(a, b net.Conn) {
var wg sync.WaitGroup
@@ -46,10 +46,16 @@ func copyLoop(a, b net.Conn) {
wg.Wait()
}
+// Implements net.Conn interface
type webRTCConn struct {
- pc *webrtc.PeerConnection
- dc *webrtc.DataChannel
- recvPipe *io.PipeReader
+ pc *webrtc.PeerConnection
+ dc *webrtc.DataChannel
+ broker *BrokerChannel
+ recvPipe *io.PipeReader
+ writePipe *io.PipeWriter
+ offerChannel chan *webrtc.SessionDescription
+ errorChannel chan error
+ openChannel chan struct{}
}
var webrtcRemote *webRTCConn
@@ -61,6 +67,7 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
func (c *webRTCConn) Write(b []byte) (int, error) {
// log.Printf("webrtc Write %d %+q", len(b), string(b))
log.Printf("Write %d bytes --> WebRTC", len(b))
+ // Buffer in case datachannel isn't available.
c.dc.Send(b)
return len(b), nil
}
@@ -90,18 +97,87 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
return fmt.Errorf("SetWriteDeadline not implemented")
}
-func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
- *webRTCConn, error) {
+// Create a WebRTC DataChannel locally.
+// This triggers "OnNegotiationNeeded" which should prepare an SDP offer.
+func (c *webRTCConn) EstablishDataChannel() error {
+ dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{})
+ if err != nil {
+ log.Printf("CreateDataChannel: %s", err)
+ return err
+ }
+ dc.OnOpen = func() {
+ log.Println("OnOpen channel")
+ c.openChannel <- struct{}{}
+ }
+ dc.OnClose = func() {
+ log.Println("OnClose channel")
+ // writePipe.Close()
+ close(c.openChannel)
+ // TODO: (Issue #12) Should attempt to renegotiate at this point.
+ }
+ dc.OnMessage = func(msg []byte) {
+ log.Printf("OnMessage <--- %d bytes", len(msg))
+ n, err := c.writePipe.Write(msg)
+ if err != nil {
+ // TODO: Maybe shouldn't actually close.
+ c.writePipe.CloseWithError(err)
+ }
+ if n != len(msg) {
+ panic("short write")
+ }
+ }
+ c.dc = dc
+ return nil
+}
- offerChan := make(chan *webrtc.SessionDescription)
- errChan := make(chan error)
- openChan := make(chan struct{})
+// Block until an offer is available, then send it to either
+// the Broker or signal pipe.
+func (c *webRTCConn) sendOffer() error {
+ select {
+ case offer := <-c.offerChannel:
+ if "" == brokerURL {
+ log.Printf("Please Copy & Paste the following to the peer:")
+ log.Printf("----------------")
+ fmt.Fprintln(logFile, "\n"+offer.Serialize()+"\n")
+ log.Printf("----------------")
+ return nil
+ }
+ // Use Broker...
+ go func() {
+ log.Println("Sending offer via BrokerChannel...\nTarget URL: ", brokerURL,
+ "\nFront URL: ", frontDomain)
+ answer, err := c.broker.Negotiate(c.pc.LocalDescription())
+ if nil != err {
+ log.Printf("BrokerChannel signaling error: %s", err)
+ return
+ }
+ if nil == answer {
+ log.Printf("BrokerChannel: No answer received.")
+ return
+ // return errors.New("No answer received.")
+ }
+ answerChannel <- answer
+ }()
+ case err := <-c.errorChannel:
+ c.pc.Close()
+ return err
+ }
+ return nil
+}
+func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
+ *webRTCConn, error) {
pc, err := webrtc.NewPeerConnection(config)
if err != nil {
log.Printf("NewPeerConnection: %s", err)
return nil, err
}
+ connection := new(webRTCConn)
+ connection.broker = broker
+ connection.pc = pc
+ connection.offerChannel = make(chan *webrtc.SessionDescription)
+ connection.errorChannel = make(chan error)
+ connection.openChannel = make(chan struct{})
// Triggered by CreateDataChannel.
pc.OnNegotiationNeeded = func() {
@@ -109,12 +185,12 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
go func() {
offer, err := pc.CreateOffer()
if err != nil {
- errChan <- err
+ connection.errorChannel <- err
return
}
err = pc.SetLocalDescription(offer)
if err != nil {
- errChan <- err
+ connection.errorChannel <- err
return
}
}()
@@ -126,7 +202,7 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
// TODO: This may soon be deprecated, consider OnIceGatheringStateChange.
pc.OnIceComplete = func() {
log.Printf("OnIceComplete")
- offerChan <- pc.LocalDescription()
+ connection.offerChannel <- pc.LocalDescription()
}
// This callback is not expected, as the Client initiates the creation
// of the data channel, not the remote peer.
@@ -135,62 +211,14 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
panic("OnDataChannel")
}
- pr, pw := io.Pipe()
+ // Pipes remain the same even when DataChannel gets switched.
+ connection.recvPipe, connection.writePipe = io.Pipe()
- dc, err := pc.CreateDataChannel("test", webrtc.Init{})
- if err != nil {
- log.Printf("CreateDataChannel: %s", err)
- return nil, err
- }
- dc.OnOpen = func() {
- log.Println("OnOpen channel")
- openChan <- struct{}{}
- }
- dc.OnClose = func() {
- log.Println("OnClose channel")
- pw.Close()
- close(openChan)
- // TODO: (Issue #12) Should attempt to renegotiate at this point.
- }
- dc.OnMessage = func(msg []byte) {
- log.Printf("OnMessage <--- %d bytes", len(msg))
- n, err := pw.Write(msg)
- if err != nil {
- pw.CloseWithError(err)
- }
- if n != len(msg) {
- panic("short write")
- }
- }
-
- select {
- case err := <-errChan:
- pc.Close()
- return nil, err
- case offer := <-offerChan:
- log.Printf("----------------")
- fmt.Fprintln(logFile, "\n"+offer.Serialize()+"\n")
- log.Printf("----------------")
- go func() {
- if "" != brokerURL {
- log.Println("Sending offer via BrokerChannel...\nTarget URL: ", brokerURL,
- "\nFront URL: ", frontDomain)
- answer, err := broker.Negotiate(pc.LocalDescription())
- if nil != err {
- log.Printf("BrokerChannel signaling error: %s", err)
- }
- if nil == answer {
- log.Printf("BrokerChannel: No answer received.")
- } else {
- signalChan <- answer
- }
- }
- }()
- }
-
- log.Printf("waiting for answer")
- answer, ok := <-signalChan
+ connection.EstablishDataChannel()
+ connection.sendOffer()
+ log.Printf("waiting for answer...")
+ answer, ok := <-answerChannel
if !ok {
pc.Close()
return nil, fmt.Errorf("no answer received")
@@ -205,13 +233,13 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
// Wait until data channel is open; otherwise for example sends may get
// lost.
// TODO: Buffering *should* work though.
- _, ok = <-openChan
+ _, ok = <-connection.openChannel
if !ok {
pc.Close()
return nil, fmt.Errorf("failed to open data channel")
}
- return &webRTCConn{pc: pc, dc: dc, recvPipe: pr}, nil
+ return connection, nil
}
func endWebRTC() {
@@ -229,6 +257,7 @@ func endWebRTC() {
}
}
+// Establish a WebRTC channel for SOCKS connections.
func handler(conn *pt.SocksConn) error {
handlerChan <- 1
defer func() {
@@ -259,7 +288,6 @@ func handler(conn *pt.SocksConn) error {
}
copyLoop(conn, remote)
-
return nil
}
@@ -293,10 +321,10 @@ func readSignalingMessages(f *os.File) {
log.Printf("ignoring invalid signal message %+q", msg)
continue
}
- signalChan <- sdp
+ answerChannel <- sdp
}
- log.Printf("close signalChan")
- close(signalChan)
+ log.Printf("close answerChannel")
+ close(answerChannel)
if err := s.Err(); err != nil {
log.Printf("signal FIFO: %s", err)
}
@@ -308,19 +336,25 @@ func main() {
flag.StringVar(&brokerURL, "url", "", "URL of signaling broker")
flag.StringVar(&frontDomain, "front", "", "front domain")
flag.Parse()
-
logFile, err = os.OpenFile("snowflake.log", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
log.Fatal(err)
}
defer logFile.Close()
log.SetOutput(logFile)
- log.Println("starting")
-
- if "" == brokerURL {
+ log.Println("\nStarting Snowflake Client...")
+
+ // Expect user to copy-paste if
+ // TODO: Maybe just get rid of copy-paste entirely.
+ if "" != brokerURL {
+ log.Println("Rendezvous using Broker at: ", brokerURL)
+ if "" != frontDomain {
+ log.Println("Domain fronting using:", frontDomain)
+ }
+ } else {
log.Println("No HTTP signaling detected. Waiting for a \"signal\" pipe...")
// This FIFO receives signaling messages.
- err = syscall.Mkfifo("signal", 0600)
+ err := syscall.Mkfifo("signal", 0600)
if err != nil {
if err.(syscall.Errno) != syscall.EEXIST {
log.Fatal(err)
@@ -363,6 +397,7 @@ func main() {
}
}
pt.CmethodsDone()
+ defer endWebRTC()
var numHandlers int = 0
var sig os.Signal
@@ -382,10 +417,9 @@ func main() {
ln.Close()
}
- if syscall.SIGTERM == sig || syscall.SIGINT == sig {
- endWebRTC()
- return
- }
+ // if syscall.SIGTERM == sig || syscall.SIGINT == sig {
+ // return
+ // }
// wait for second signal or no more handlers
sig = nil
More information about the tor-commits
mailing list