[tor-commits] [nyx/master] Rewritten, more performant log deduplication

atagar at torproject.org atagar at torproject.org
Tue May 5 05:42:06 UTC 2015


commit f47bcd4289751f349fd047a5e801ad59da394b0c
Author: Damian Johnson <atagar at torproject.org>
Date:   Tue Apr 21 09:01:37 2015 -0700

    Rewritten, more performant log deduplication
    
    Our log deduplication was pretty grossly inefficient, doing O(n^2) operations
    every time we redrew the interface. Replacing this with a O(n) operation when
    we add log messages.
    
    This should both drop arm's cpu usage when reading messages at the DEBUG
    runlevel and allow us to always support deduplicaion (previously arm turned it
    off when it got too slow).
    
    This also moves deduplication to our util and adds tests (yay!). Next bit to
    port over is the daybreak handling...
---
 nyx/config/dedup.cfg           |    6 +-
 nyx/log_panel.py               |  131 ++++++++++------------------------------
 nyx/util/log.py                |   65 +++++++++++++++++++-
 test/util/log/deduplication.py |   27 ---------
 test/util/log/log_entry.py     |   27 +++++++++
 test/util/log/log_group.py     |   87 ++++++++++++++++++++++++++
 6 files changed, 214 insertions(+), 129 deletions(-)

diff --git a/nyx/config/dedup.cfg b/nyx/config/dedup.cfg
index 06c4ff2..954e588 100644
--- a/nyx/config/dedup.cfg
+++ b/nyx/config/dedup.cfg
@@ -37,7 +37,9 @@
 # [NOTICE] I learned some more directory information, but not enough to build a
 #          circuit: We have only 469/2027 usable descriptors.
 # [NOTICE] Attempt by %s to open a stream from unknown relay. Closing.
-# [NOTICE] Bootstrapped 72%: Loading relay descriptors.
+# [NOTICE] Average packaged cell fullness: 70.976%. TLS write overhead: 11%
+# [NOTICE] Heartbeat: Tor's uptime is 8 days 6:00 hours, with 0 circuits open.
+#          I've sent 3.53 MB and received 90.61 MB.
 # [WARN] You specified a server "Amunet8" by name, but this name is not
 #        registered
 # [WARN] I have no descriptor for the router named "Amunet8" in my declared
@@ -79,6 +81,8 @@ dedup.NOTICE We stalled too much while trying to write
 dedup.NOTICE I learned some more directory information, but not enough to build a circuit
 dedup.NOTICE Attempt by
 dedup.NOTICE *Loading relay descriptors.
+dedup.NOTICE Average packaged cell fullness:
+dedup.NOTICE Heartbeat: Tor's uptime is
 dedup.WARN You specified a server
 dedup.WARN I have no descriptor for the router named
 dedup.WARN Controller gave us config lines that didn't validate
diff --git a/nyx/log_panel.py b/nyx/log_panel.py
index ff64c78..f61d9c6 100644
--- a/nyx/log_panel.py
+++ b/nyx/log_panel.py
@@ -22,7 +22,7 @@ import nyx.popups
 
 from nyx import __version__
 from nyx.util import panel, tor_controller, ui_tools
-from nyx.util.log import LogEntry, read_tor_log
+from nyx.util.log import LogGroup, LogEntry, read_tor_log
 
 DAYBREAK_EVENT = 'DAYBREAK'  # special event for marking when the date changes
 TIMEZONE_OFFSET = time.altzone if time.localtime()[8] else time.timezone
@@ -70,12 +70,6 @@ CONTENT_HEIGHT_REDRAW_THRESHOLD = 3
 
 CACHED_DAYBREAKS_ARGUMENTS = (None, None)  # events, current day
 CACHED_DAYBREAKS_RESULT = None
-CACHED_DUPLICATES_ARGUMENTS = None  # events
-CACHED_DUPLICATES_RESULT = None
-
-# duration we'll wait for the deduplication function before giving up (in ms)
-
-DEDUPLICATION_TIMEOUT = 100
 
 # maximum number of regex filters we'll remember
 
@@ -147,59 +141,6 @@ def get_daybreaks(events, ignore_time_for_cache = False):
   return new_listing
 
 
-def get_duplicates(events):
-  """
-  Deduplicates a list of log entries, providing back a tuple listing with the
-  log entry and count of duplicates following it. Entries in different days are
-  not considered to be duplicates. This times out, returning None if it takes
-  longer than DEDUPLICATION_TIMEOUT.
-
-  Arguments:
-    events - chronologically ordered listing of events
-  """
-
-  global CACHED_DUPLICATES_ARGUMENTS, CACHED_DUPLICATES_RESULT
-
-  if CACHED_DUPLICATES_ARGUMENTS == events:
-    return list(CACHED_DUPLICATES_RESULT)
-
-  start_time = time.time()
-  events_remaining = list(events)
-  return_events = []
-
-  while events_remaining:
-    entry, duplicate_indices = events_remaining.pop(0), []
-
-    for i, earlier_entry in enumerate(events_remaining):
-      # if showing dates then do duplicate detection for each day, rather
-      # than globally
-
-      if earlier_entry.type == DAYBREAK_EVENT:
-        break
-
-      if entry.is_duplicate(earlier_entry):
-        duplicate_indices.append(i)
-
-    # checks if the call timeout has been reached
-
-    if (time.time() - start_time) > DEDUPLICATION_TIMEOUT / 1000.0:
-      return None
-
-    # drops duplicate entries
-
-    duplicate_indices.reverse()
-
-    for i in duplicate_indices:
-      del events_remaining[i]
-
-    return_events.append((entry, len(duplicate_indices)))
-
-  CACHED_DUPLICATES_ARGUMENTS = list(events)
-  CACHED_DUPLICATES_RESULT = list(return_events)
-
-  return return_events
-
-
 class LogPanel(panel.Panel, threading.Thread, logging.Handler):
   """
   Listens for and displays tor, nyx, and stem events. This can prepopulate
@@ -241,13 +182,14 @@ class LogPanel(panel.Panel, threading.Thread, logging.Handler):
 
     self.logged_events = self.set_event_listening(logged_events)
 
-    self.set_pause_attr('msg_log')       # tracks the message log when we're paused
-    self.msg_log = []                    # log entries, sorted by the timestamp
     self.regex_filter = None             # filter for presented log events (no filtering if None)
     self.last_content_height = 0         # height of the rendered content when last drawn
     self.log_file = None                 # file log messages are saved to (skipped if None)
     self.scroll = 0
 
+    self.set_pause_attr('_msg_log')
+    self._msg_log = LogGroup(CONFIG['cache.log_panel.size'])
+
     self._last_update = -1               # time the content was last revised
     self._halt = False                   # terminates thread if true
     self._cond = threading.Condition()   # used for pausing/resuming the thread
@@ -272,7 +214,7 @@ class LogPanel(panel.Panel, threading.Thread, logging.Handler):
 
     # leaving last_content_height as being too low causes initialization problems
 
-    self.last_content_height = len(self.msg_log)
+    self.last_content_height = len(self._msg_log)
 
     # adds listeners for tor and stem events
 
@@ -318,7 +260,7 @@ class LogPanel(panel.Panel, threading.Thread, logging.Handler):
     with self.vals_lock:
       # clears the event log
 
-      self.msg_log = []
+      self._msg_log = LogGroup(CONFIG['cache.log_panel.size'])
 
       # fetches past tor events from log file, if available
 
@@ -330,19 +272,14 @@ class LogPanel(panel.Panel, threading.Thread, logging.Handler):
 
         if logging_location:
           try:
-            for entry in read_tor_log(logging_location, read_limit):
+            for entry in reversed(list(read_tor_log(logging_location, read_limit))):
               if entry.type in set_runlevels:
-                self.msg_log.append(entry)
+                self._msg_log.add(entry.timestamp, entry.type, entry.message)
           except IOError as exc:
             log.info('Unable to read log located at %s: %s' % (logging_location, exc))
           except ValueError as exc:
             log.info(str(exc))
 
-      # crops events that are either too old, or more numerous than the caching size
-
-      if len(self.msg_log) > CONFIG['cache.log_panel.size']:
-        del self.msg_log[CONFIG['cache.log_panel.size']:]
-
   def set_duplicate_visability(self, is_visible):
     """
     Sets if duplicate log entries are collaped or expanded.
@@ -392,17 +329,13 @@ class LogPanel(panel.Panel, threading.Thread, logging.Handler):
         self.log_file = None
 
     with self.vals_lock:
-      self.msg_log.insert(0, event)
-
-      if len(self.msg_log) > CONFIG['cache.log_panel.size']:
-        del self.msg_log[CONFIG['cache.log_panel.size']:]
+      self._msg_log.add(event.timestamp, event.type, event.message)
 
       # notifies the display that it has new content
 
       if not self.regex_filter or self.regex_filter.search(event.display_message):
-        self._cond.acquire()
-        self._cond.notifyAll()
-        self._cond.release()
+        with self._cond:
+          self._cond.notifyAll()
 
   def set_logged_events(self, event_types):
     """
@@ -544,7 +477,7 @@ class LogPanel(panel.Panel, threading.Thread, logging.Handler):
     """
 
     with self.vals_lock:
-      self.msg_log = []
+      self._msg_log = LogGroup(CONFIG['cache.log_panel.size'])
       self.redraw(True)
 
   def save_snapshot(self, path):
@@ -573,7 +506,7 @@ class LogPanel(panel.Panel, threading.Thread, logging.Handler):
 
     with self.vals_lock:
       try:
-        for entry in self.msg_log:
+        for entry in reversed(self._msg_log):
           is_visible = not self.regex_filter or self.regex_filter.search(entry.display_message)
 
           if is_visible:
@@ -655,10 +588,11 @@ class LogPanel(panel.Panel, threading.Thread, logging.Handler):
     contain up to two lines. Starts with newest entries.
     """
 
-    current_log = self.get_attr('msg_log')
+    event_log = self.get_attr('_msg_log')
 
     with self.vals_lock:
-      self._last_logged_events, self._last_update = list(current_log), time.time()
+      self._last_logged_events, self._last_update = event_log, time.time()
+      event_log = list(event_log)
 
       # draws the top label
 
@@ -684,16 +618,17 @@ class LogPanel(panel.Panel, threading.Thread, logging.Handler):
       seen_first_date_divider = False
       divider_attr, duplicate_attr = (curses.A_BOLD, 'yellow'), (curses.A_BOLD, 'green')
 
-      is_dates_shown = self.regex_filter is None and CONFIG['features.log.showDateDividers']
-      event_log = get_daybreaks(current_log, self.is_paused()) if is_dates_shown else list(current_log)
+      # TODO: fix daybreak handling
+      # is_dates_shown = self.regex_filter is None and CONFIG['features.log.showDateDividers']
+      # event_log = get_daybreaks(current_log, self.is_paused()) if is_dates_shown else current_log
 
       if not CONFIG['features.log.showDuplicateEntries']:
-        deduplicated_log = get_duplicates(event_log)
+        deduplicated_log = []
 
-        if deduplicated_log is None:
-          log.warn('Deduplication took too long. Its current implementation has difficulty handling large logs so disabling it to keep the interface responsive.')
-          self.set_duplicate_visability(True)
-          deduplicated_log = [(entry, 0) for entry in event_log]
+        for entry in event_log:
+          if not entry.is_duplicate:
+            duplicate_count = len(entry.duplicates) if entry.duplicates else 0
+            deduplicated_log.append((entry, duplicate_count))
       else:
         deduplicated_log = [(entry, 0) for entry in event_log]
 
@@ -843,18 +778,15 @@ class LogPanel(panel.Panel, threading.Thread, logging.Handler):
 
       sleep_time = 0
 
-      if (self.msg_log == self._last_logged_events and last_day == current_day) or self.is_paused():
+      if (self._msg_log == self._last_logged_events and last_day == current_day) or self.is_paused():
         sleep_time = 5
       elif time_since_reset < max_log_update_rate:
         sleep_time = max(0.05, max_log_update_rate - time_since_reset)
 
       if sleep_time:
-        self._cond.acquire()
-
-        if not self._halt:
-          self._cond.wait(sleep_time)
-
-        self._cond.release()
+        with self._cond:
+          if not self._halt:
+            self._cond.wait(sleep_time)
       else:
         last_day = current_day
         self.redraw(True)
@@ -869,10 +801,9 @@ class LogPanel(panel.Panel, threading.Thread, logging.Handler):
     Halts further resolutions and terminates the thread.
     """
 
-    self._cond.acquire()
-    self._halt = True
-    self._cond.notifyAll()
-    self._cond.release()
+    with self._cond:
+      self._halt = True
+      self._cond.notifyAll()
 
   def set_event_listening(self, events):
     """
diff --git a/nyx/util/log.py b/nyx/util/log.py
index 6e28e6c..545eb75 100644
--- a/nyx/util/log.py
+++ b/nyx/util/log.py
@@ -4,6 +4,7 @@ runlevels.
 """
 
 import time
+import threading
 
 import stem.util.conf
 import stem.util.log
@@ -40,6 +41,61 @@ def _common_log_messages():
   return messages
 
 
+class LogGroup(object):
+  """
+  Thread safe collection of LogEntry instancs, which maintains a certain size
+  and supports deduplication.
+  """
+
+  def __init__(self, max_size):
+    self._max_size = max_size
+    self._entries = []
+    self._lock = threading.RLock()
+
+  def add(self, timestamp, type, message):
+    entry = LogEntry(timestamp, type, message)
+
+    with self._lock:
+      duplicate = None
+
+      for existing_entry in self._entries:
+        if entry.is_duplicate_of(existing_entry):
+          duplicate = existing_entry
+          break
+
+      if duplicate:
+        if not duplicate.duplicates:
+          duplicate.duplicates = [duplicate]
+
+        duplicate.is_duplicate = True
+        entry.duplicates = duplicate.duplicates
+        entry.duplicates.insert(0, entry)
+
+      self._entries.insert(0, entry)
+
+      while len(self._entries) > self._max_size:
+        self.pop()
+
+  def pop(self):
+    with self._lock:
+      last_entry = self._entries.pop()
+
+      # By design if the last entry is a duplicate it will also be the last
+      # item in its duplicate group.
+
+      if last_entry.is_duplicate:
+        last_entry.duplicates.pop()
+
+  def __len__(self):
+    with self._lock:
+      return len(self._entries)
+
+  def __iter__(self):
+    with self._lock:
+      for entry in self._entries:
+        yield entry
+
+
 class LogEntry(object):
   """
   Individual tor or nyx log entry.
@@ -51,6 +107,10 @@ class LogEntry(object):
   :var str type: event type
   :var str message: event's message
   :var str display_message: message annotated with our time and runlevel
+
+  :var bool is_duplicate: true if this matches other messages in the group and
+    isn't the first
+  :var list duplicates: messages that are identical to thsi one
   """
 
   def __init__(self, timestamp, type, message):
@@ -61,8 +121,11 @@ class LogEntry(object):
     entry_time = time.localtime(self.timestamp)
     self.display_message = '%02i:%02i:%02i [%s] %s' % (entry_time[3], entry_time[4], entry_time[5], self.type, self.message)
 
+    self.is_duplicate = False
+    self.duplicates = None
+
   @lru_cache()
-  def is_duplicate(self, entry):
+  def is_duplicate_of(self, entry):
     """
     Checks if we are a duplicate of the given message or not.
 
diff --git a/test/util/log/deduplication.py b/test/util/log/deduplication.py
deleted file mode 100644
index fdd97f2..0000000
--- a/test/util/log/deduplication.py
+++ /dev/null
@@ -1,27 +0,0 @@
-import unittest
-
-from nyx.util.log import LogEntry
-
-
-class TestLogDeduplication(unittest.TestCase):
-  def test_matches_identical_messages(self):
-    # Simple case is that we match the same message but different timestamp.
-
-    entry = LogEntry(1333738434, 'INFO', 'tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"')
-    self.assertTrue(entry.is_duplicate(LogEntry(1333738457, 'INFO', 'tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"')))
-
-    # ... but we shouldn't match if the runlevel differs.
-
-    self.assertFalse(entry.is_duplicate(LogEntry(1333738457, 'DEBUG', 'tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"')))
-
-  def test_matches_based_on_prefix(self):
-    # matches using a prefix specified in dedup.cfg
-
-    entry = LogEntry(1333738434, 'NYX_DEBUG', 'GETCONF MyFamily (runtime: 0.0007)')
-    self.assertTrue(entry.is_duplicate(LogEntry(1333738457, 'NYX_DEBUG', 'GETCONF MyFamily (runtime: 0.0015)')))
-
-  def test_matches_with_wildcard(self):
-    # matches using a wildcard specified in dedup.cfg
-
-    entry = LogEntry(1333738434, 'NOTICE', 'Bootstrapped 72%: Loading relay descriptors.')
-    self.assertTrue(entry.is_duplicate(LogEntry(1333738457, 'NOTICE', 'Bootstrapped 55%: Loading relay descriptors.')))
diff --git a/test/util/log/log_entry.py b/test/util/log/log_entry.py
new file mode 100644
index 0000000..bd570f4
--- /dev/null
+++ b/test/util/log/log_entry.py
@@ -0,0 +1,27 @@
+import unittest
+
+from nyx.util.log import LogEntry
+
+
+class TestLogEntry(unittest.TestCase):
+  def test_deduplication_matches_identical_messages(self):
+    # Simple case is that we match the same message but different timestamp.
+
+    entry = LogEntry(1333738434, 'INFO', 'tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"')
+    self.assertTrue(entry.is_duplicate_of(LogEntry(1333738457, 'INFO', 'tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"')))
+
+    # ... but we shouldn't match if the runlevel differs.
+
+    self.assertFalse(entry.is_duplicate_of(LogEntry(1333738457, 'DEBUG', 'tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"')))
+
+  def test_deduplication_matches_based_on_prefix(self):
+    # matches using a prefix specified in dedup.cfg
+
+    entry = LogEntry(1333738434, 'NYX_DEBUG', 'GETCONF MyFamily (runtime: 0.0007)')
+    self.assertTrue(entry.is_duplicate_of(LogEntry(1333738457, 'NYX_DEBUG', 'GETCONF MyFamily (runtime: 0.0015)')))
+
+  def test_deduplication_matches_with_wildcard(self):
+    # matches using a wildcard specified in dedup.cfg
+
+    entry = LogEntry(1333738434, 'NOTICE', 'Bootstrapped 72%: Loading relay descriptors.')
+    self.assertTrue(entry.is_duplicate_of(LogEntry(1333738457, 'NOTICE', 'Bootstrapped 55%: Loading relay descriptors.')))
diff --git a/test/util/log/log_group.py b/test/util/log/log_group.py
new file mode 100644
index 0000000..732ee14
--- /dev/null
+++ b/test/util/log/log_group.py
@@ -0,0 +1,87 @@
+import unittest
+
+from nyx.util.log import LogGroup, LogEntry
+
+
+class TestLogGroup(unittest.TestCase):
+  def test_maintains_certain_size(self):
+    group = LogGroup(5)
+    self.assertEqual(0, len(group))
+
+    group.add(1333738410, 'INFO', 'tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"')
+    self.assertEqual([LogEntry(1333738410, 'INFO', 'tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"')], list(group))
+    self.assertEqual(1, len(group))
+
+    group.add(1333738420, 'NYX_DEBUG', 'GETCONF MyFamily (runtime: 0.0007)')
+    group.add(1333738430, 'NOTICE', 'Bootstrapped 72%: Loading relay descriptors.')
+    group.add(1333738440, 'NOTICE', 'Bootstrapped 75%: Loading relay descriptors.')
+    group.add(1333738450, 'NOTICE', 'Bootstrapped 78%: Loading relay descriptors.')
+    self.assertEqual(5, len(group))
+
+    # group should now be full, adding more entries pops others off
+
+    group.add(1333738460, 'NOTICE', 'Bootstrapped 80%: Loading relay descriptors.')
+    self.assertFalse(LogEntry(1333738410, 'INFO', 'tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"') in list(group))
+    self.assertEqual(5, len(group))
+
+    # try adding a bunch that will be deduplicated, and make sure we still maintain the size
+
+    group.add(1333738510, 'NOTICE', 'Bootstrapped 80%: Loading relay descriptors.')
+    group.add(1333738520, 'NOTICE', 'Bootstrapped 80%: Loading relay descriptors.')
+    group.add(1333738530, 'NOTICE', 'Bootstrapped 80%: Loading relay descriptors.')
+    group.add(1333738540, 'NOTICE', 'Bootstrapped 80%: Loading relay descriptors.')
+    group.add(1333738550, 'NOTICE', 'Bootstrapped 80%: Loading relay descriptors.')
+    group.add(1333738560, 'NOTICE', 'Bootstrapped 80%: Loading relay descriptors.')
+    group.add(1333738570, 'NOTICE', 'Bootstrapped 80%: Loading relay descriptors.')
+    self.assertEqual([1333738570, 1333738560, 1333738550, 1333738540, 1333738530], [e.timestamp for e in group])
+    self.assertEqual(5, len(group))
+
+  def test_deduplication(self):
+    group = LogGroup(5)
+    group.add(1333738410, 'NOTICE', 'Bootstrapped 72%: Loading relay descriptors.')
+    group.add(1333738420, 'NOTICE', 'Bootstrapped 75%: Loading relay descriptors.')
+    group.add(1333738430, 'NYX_DEBUG', 'GETCONF MyFamily (runtime: 0.0007)')
+    group.add(1333738440, 'NOTICE', 'Bootstrapped 78%: Loading relay descriptors.')
+    group.add(1333738450, 'NOTICE', 'Bootstrapped 80%: Loading relay descriptors.')
+    self.assertEqual([1333738450, 1333738440, 1333738430, 1333738420, 1333738410], [e.timestamp for e in group])
+
+    bootstrap_messages = [
+      'Bootstrapped 80%: Loading relay descriptors.',
+      'Bootstrapped 78%: Loading relay descriptors.',
+      'Bootstrapped 75%: Loading relay descriptors.',
+      'Bootstrapped 72%: Loading relay descriptors.',
+    ]
+
+    group_items = list(group)
+    self.assertEqual(bootstrap_messages, [e.message for e in group_items[0].duplicates])
+    self.assertEqual([False, True, False, True, True], [e.is_duplicate for e in group_items])
+
+    # add another duplicate message that pops the last
+
+    group.add(1333738460, 'NOTICE', 'Bootstrapped 90%: Loading relay descriptors.')
+
+    bootstrap_messages = [
+      'Bootstrapped 90%: Loading relay descriptors.',
+      'Bootstrapped 80%: Loading relay descriptors.',
+      'Bootstrapped 78%: Loading relay descriptors.',
+      'Bootstrapped 75%: Loading relay descriptors.',
+    ]
+
+    group_items = list(group)
+    self.assertEqual(bootstrap_messages, [e.message for e in group_items[0].duplicates])
+    self.assertEqual([False, True, True, False, True], [e.is_duplicate for e in group_items])
+
+    # add another non-duplicate message that pops the last
+
+    group.add(1333738470, 'INFO', 'tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"')
+
+    bootstrap_messages = [
+      'Bootstrapped 90%: Loading relay descriptors.',
+      'Bootstrapped 80%: Loading relay descriptors.',
+      'Bootstrapped 78%: Loading relay descriptors.',
+    ]
+
+    group_items = list(group)
+    self.assertEqual(None, group_items[0].duplicates)
+    self.assertEqual(bootstrap_messages, [e.message for e in group_items[1].duplicates])
+    self.assertEqual([False, False, True, True, False], [e.is_duplicate for e in group_items])





More information about the tor-commits mailing list