[tor-commits] [snowflake/master] log traffic bytes only once every few seconds, along with OnMessage & datachannel.Send counts, to prevent flooded logs
serene at torproject.org
serene at torproject.org
Thu Feb 18 22:15:38 UTC 2016
commit c4215b5614210ec81d728d834f42754efb6d5130
Author: Serene Han <keroserene+git at gmail.com>
Date: Thu Feb 18 14:15:22 2016 -0800
log traffic bytes only once every few seconds, along with OnMessage & datachannel.Send counts, to prevent flooded logs
---
client/client_test.go | 4 +++
client/snowflake.go | 17 +++++++++----
client/util.go | 67 +++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 83 insertions(+), 5 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go
index ff15d6d..6ee36d9 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -24,6 +24,10 @@ func TestConnect(t *testing.T) {
Convey("WebRTC Connection", func() {
c := new(webRTCConn)
+ c.BytesInfo = &BytesInfo{
+ inboundChan: make(chan int), outboundChan: make(chan int),
+ inbound: 0, outbound: 0, inEvents: 0, outEvents: 0,
+ }
So(c.buffer.Bytes(), ShouldEqual, nil)
Convey("SendData buffers when datachannel is nil", func() {
diff --git a/client/snowflake.go b/client/snowflake.go
index 081cb3b..eba68eb 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -67,6 +67,7 @@ type webRTCConn struct {
writePipe *io.PipeWriter
buffer bytes.Buffer
reset chan struct{}
+ *BytesInfo
}
var webrtcRemote *webRTCConn
@@ -164,12 +165,12 @@ func (c *webRTCConn) EstablishDataChannel() error {
dc.OnOpen = func() {
log.Println("WebRTC: DataChannel.OnOpen")
// if nil != c.snowflake {
- // panic("PeerConnection snowflake already exists.")
+ // panic("PeerConnection snowflake already exists.")
// }
// Flush the buffer, then enable datachannel.
// TODO: Make this more safe
dc.Send(c.buffer.Bytes())
- log.Println("Flushed ", c.buffer.Len(), " bytes")
+ log.Println("Flushed", c.buffer.Len(), "bytes")
c.buffer.Reset()
c.snowflake = dc
}
@@ -184,7 +185,7 @@ func (c *webRTCConn) EstablishDataChannel() error {
}
}
dc.OnMessage = func(msg []byte) {
- log.Printf("OnMessage <--- %d bytes", len(msg))
+ c.BytesInfo.AddInbound(len(msg))
n, err := c.writePipe.Write(msg)
if err != nil {
// TODO: Maybe shouldn't actually close.
@@ -247,6 +248,7 @@ func (c *webRTCConn) ReceiveAnswer() {
}
func (c *webRTCConn) sendData(data []byte) {
+ c.BytesInfo.AddOutbound(len(data))
// Buffer the data in case datachannel isn't available yet.
if nil == c.snowflake {
log.Printf("Buffered %d bytes --> WebRTC", len(data))
@@ -256,10 +258,9 @@ func (c *webRTCConn) sendData(data []byte) {
// Otherwise, flush buffer if necessary.
for c.buffer.Len() > 0 {
c.snowflake.Send(c.buffer.Bytes())
- log.Println("Flushed ", c.buffer.Len(), " bytes")
+ log.Println("Flushed", c.buffer.Len(), "bytes")
c.buffer.Reset()
}
- log.Printf("Write %d bytes --> WebRTC", len(data))
c.snowflake.Send(data)
}
@@ -297,6 +298,12 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
connection.answerChannel = make(chan *webrtc.SessionDescription)
connection.errorChannel = make(chan error)
connection.reset = make(chan struct{})
+ connection.BytesInfo = &BytesInfo{
+ inboundChan: make(chan int), outboundChan: make(chan int),
+ inbound: 0, outbound: 0, inEvents: 0, outEvents: 0,
+ }
+ go connection.BytesInfo.Log()
+
// Pipes remain the same even when DataChannel gets switched.
connection.recvPipe, connection.writePipe = io.Pipe()
diff --git a/client/util.go b/client/util.go
new file mode 100644
index 0000000..fa04220
--- /dev/null
+++ b/client/util.go
@@ -0,0 +1,67 @@
+package main
+
+import (
+ "log"
+ "time"
+)
+
+type BytesInfo struct {
+ outboundChan chan int
+ inboundChan chan int
+ outbound int
+ inbound int
+ outEvents int
+ inEvents int
+ isLogging bool
+}
+
+func (b *BytesInfo) Log() {
+ b.isLogging = true
+ var amount int
+ output := func() {
+ log.Printf("Traffic Bytes (in|out): %d | %d -- (%d OnMessages, %d Sends)",
+ b.inbound, b.outbound, b.inEvents, b.outEvents)
+ b.outbound = 0
+ b.outEvents = 0
+ b.inbound = 0
+ b.inEvents = 0
+ }
+ last := time.Now()
+ for {
+ select {
+ case amount = <-b.outboundChan:
+ b.outbound += amount
+ b.outEvents++
+ last := time.Now()
+ if time.Since(last) > time.Second*5 {
+ last = time.Now()
+ output()
+ }
+ case amount = <-b.inboundChan:
+ b.inbound += amount
+ b.inEvents++
+ if time.Since(last) > time.Second*5 {
+ last = time.Now()
+ output()
+ }
+ case <-time.After(time.Second * 5):
+ if b.inEvents > 0 || b.outEvents > 0 {
+ output()
+ }
+ }
+ }
+}
+
+func (b *BytesInfo) AddOutbound(amount int) {
+ if !b.isLogging {
+ return
+ }
+ b.outboundChan <- amount
+}
+
+func (b *BytesInfo) AddInbound(amount int) {
+ if !b.isLogging {
+ return
+ }
+ b.inboundChan <- amount
+}
More information about the tor-commits
mailing list