[tor-commits] [tor/master] Ensure worker threads actually exit when it is time

nickm at torproject.org nickm at torproject.org
Fri Aug 21 14:37:17 UTC 2015


commit 32220d38c04bdb0aeccfcd67715e11a655eb297b
Author: Sebastian Hahn <sebastian at torproject.org>
Date:   Thu Aug 20 16:48:13 2015 +0200

    Ensure worker threads actually exit when it is time
    
    This includes a small refactoring to use a new enum (workqueue_reply_t)
    for the return values instead of just ints.
---
 changes/workqueue_reply_t |    6 ++++++
 src/common/workqueue.c    |   21 +++++++++++----------
 src/common/workqueue.h    |   19 ++++++++++---------
 src/or/cpuworker.c        |    4 ++--
 src/test/test_workqueue.c |    8 ++++----
 5 files changed, 33 insertions(+), 25 deletions(-)

diff --git a/changes/workqueue_reply_t b/changes/workqueue_reply_t
new file mode 100644
index 0000000..c2d3f4a
--- /dev/null
+++ b/changes/workqueue_reply_t
@@ -0,0 +1,6 @@
+  o Minor bugfixes:
+    - Ensure that worker threads actually exit when a fatal error or
+      shutdown is indicated. This doesn't currently affect the behaviour
+      of Tor, because Tor never indicates fatal error or shutdown except
+      in its unit tests. Fixes bug 16868; bugfix on 0.2.6.3-alpha.
+
diff --git a/src/common/workqueue.c b/src/common/workqueue.c
index b0b004d..c467bdf 100644
--- a/src/common/workqueue.c
+++ b/src/common/workqueue.c
@@ -25,7 +25,7 @@ struct threadpool_s {
   unsigned generation;
 
   /** Function that should be run for updates on each thread. */
-  int (*update_fn)(void *, void *);
+  workqueue_reply_t (*update_fn)(void *, void *);
   /** Function to free update arguments if they can't be run. */
   void (*free_update_arg_fn)(void *);
   /** Array of n_threads update arguments. */
@@ -56,7 +56,7 @@ struct workqueue_entry_s {
   /** True iff this entry is waiting for a worker to start processing it. */
   uint8_t pending;
   /** Function to run in the worker thread. */
-  int (*fn)(void *state, void *arg);
+  workqueue_reply_t (*fn)(void *state, void *arg);
   /** Function to run while processing the reply queue. */
   void (*reply_fn)(void *arg);
   /** Argument for the above functions. */
@@ -96,7 +96,7 @@ static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
  * <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main
  * thread. See threadpool_queue_work() for full documentation. */
 static workqueue_entry_t *
-workqueue_entry_new(int (*fn)(void*, void*),
+workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*),
                     void (*reply_fn)(void*),
                     void *arg)
 {
@@ -172,7 +172,7 @@ worker_thread_main(void *thread_)
   workerthread_t *thread = thread_;
   threadpool_t *pool = thread->in_pool;
   workqueue_entry_t *work;
-  int result;
+  workqueue_reply_t result;
 
   tor_mutex_acquire(&pool->lock);
   while (1) {
@@ -182,13 +182,14 @@ worker_thread_main(void *thread_)
       if (thread->in_pool->generation != thread->generation) {
         void *arg = thread->in_pool->update_args[thread->index];
         thread->in_pool->update_args[thread->index] = NULL;
-        int (*update_fn)(void*,void*) = thread->in_pool->update_fn;
+        workqueue_reply_t (*update_fn)(void*,void*) =
+            thread->in_pool->update_fn;
         thread->generation = thread->in_pool->generation;
         tor_mutex_release(&pool->lock);
 
-        int r = update_fn(thread->state, arg);
+        workqueue_reply_t r = update_fn(thread->state, arg);
 
-        if (r < 0) {
+        if (r != WQ_RPL_REPLY) {
           return;
         }
 
@@ -208,7 +209,7 @@ worker_thread_main(void *thread_)
       queue_reply(thread->reply_queue, work);
 
       /* We may need to exit the thread. */
-      if (result >= WQ_RPL_ERROR) {
+      if (result != WQ_RPL_REPLY) {
         return;
       }
       tor_mutex_acquire(&pool->lock);
@@ -281,7 +282,7 @@ workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue)
  */
 workqueue_entry_t *
 threadpool_queue_work(threadpool_t *pool,
-                      int (*fn)(void *, void *),
+                      workqueue_reply_t (*fn)(void *, void *),
                       void (*reply_fn)(void *),
                       void *arg)
 {
@@ -318,7 +319,7 @@ threadpool_queue_work(threadpool_t *pool,
 int
 threadpool_queue_update(threadpool_t *pool,
                          void *(*dup_fn)(void *),
-                         int (*fn)(void *, void *),
+                         workqueue_reply_t (*fn)(void *, void *),
                          void (*free_fn)(void *),
                          void *arg)
 {
diff --git a/src/common/workqueue.h b/src/common/workqueue.h
index 92e82b8..9ce1ead 100644
--- a/src/common/workqueue.h
+++ b/src/common/workqueue.h
@@ -15,21 +15,22 @@ typedef struct threadpool_s threadpool_t;
  * pool. */
 typedef struct workqueue_entry_s workqueue_entry_t;
 
-/** Possible return value from a work function: indicates success. */
-#define WQ_RPL_REPLY    0
-/** Possible return value from a work function: indicates fatal error */
-#define WQ_RPL_ERROR    1
-/** Possible return value from a work function: indicates thread is shutting
- * down. */
-#define WQ_RPL_SHUTDOWN 2
+/** Possible return value from a work function: */
+typedef enum {
+  WQ_RPL_REPLY = 0, /** indicates success */
+  WQ_RPL_ERROR = 1, /** indicates fatal error */
+  WQ_RPL_SHUTDOWN = 2, /** indicates thread is shutting down */
+} workqueue_reply_t;
 
 workqueue_entry_t *threadpool_queue_work(threadpool_t *pool,
-                                         int (*fn)(void *, void *),
+                                         workqueue_reply_t (*fn)(void *,
+                                                                 void *),
                                          void (*reply_fn)(void *),
                                          void *arg);
+
 int threadpool_queue_update(threadpool_t *pool,
                             void *(*dup_fn)(void *),
-                            int (*fn)(void *, void *),
+                            workqueue_reply_t (*fn)(void *, void *),
                             void (*free_fn)(void *),
                             void *arg);
 void *workqueue_entry_cancel(workqueue_entry_t *pending_work);
diff --git a/src/or/cpuworker.c b/src/or/cpuworker.c
index d511ecf..76d97e0 100644
--- a/src/or/cpuworker.c
+++ b/src/or/cpuworker.c
@@ -160,7 +160,7 @@ typedef struct cpuworker_job_u {
   } u;
 } cpuworker_job_t;
 
-static int
+static workqueue_reply_t
 update_state_threadfn(void *state_, void *work_)
 {
   worker_state_t *state = state_;
@@ -387,7 +387,7 @@ cpuworker_onion_handshake_replyfn(void *work_)
 }
 
 /** Implementation function for onion handshake requests. */
-static int
+static workqueue_reply_t
 cpuworker_onion_handshake_threadfn(void *state_, void *work_)
 {
   worker_state_t *state = state_;
diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c
index 1d2cd94..b6a10bb 100644
--- a/src/test/test_workqueue.c
+++ b/src/test/test_workqueue.c
@@ -70,7 +70,7 @@ mark_handled(int serial)
 #endif
 }
 
-static int
+static workqueue_reply_t
 workqueue_do_rsa(void *state, void *work)
 {
   rsa_work_t *rw = work;
@@ -98,7 +98,7 @@ workqueue_do_rsa(void *state, void *work)
   return WQ_RPL_REPLY;
 }
 
-static int
+static workqueue_reply_t
 workqueue_do_shutdown(void *state, void *work)
 {
   (void)state;
@@ -108,7 +108,7 @@ workqueue_do_shutdown(void *state, void *work)
   return WQ_RPL_SHUTDOWN;
 }
 
-static int
+static workqueue_reply_t
 workqueue_do_ecdh(void *state, void *work)
 {
   ecdh_work_t *ew = work;
@@ -124,7 +124,7 @@ workqueue_do_ecdh(void *state, void *work)
   return WQ_RPL_REPLY;
 }
 
-static int
+static workqueue_reply_t
 workqueue_shutdown_error(void *state, void *work)
 {
   (void)state;





More information about the tor-commits mailing list