source: trunk/third/glib2/glib/gthreadpool.c @ 18159

Revision 18159, 18.6 KB checked in by ghudson, 22 years ago (diff)
This commit was generated by cvs2svn to compensate for changes in r18158, which included commits to RCS files with non-trunk default branches.
Line 
1/* GLIB - Library of useful routines for C programming
2 * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3 *
4 * GAsyncQueue: thread pool implementation.
5 * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20 * Boston, MA 02111-1307, USA.
21 */
22
23/*
24 * MT safe
25 */
26
27#include "config.h"
28
29#include "glib.h"
30
31
32typedef struct _GRealThreadPool GRealThreadPool;
33
34struct _GRealThreadPool
35{
36  GThreadPool pool;
37  GAsyncQueue* queue;
38  gint max_threads;
39  gint num_threads;
40  gboolean running;
41  gboolean immediate;
42  gboolean waiting;
43};
44
45/* The following is just an address to mark the stop order for a
46 * thread, it could be any address (as long, as it isn't a valid
47 * GThreadPool address) */
48static const gpointer stop_this_thread_marker = (gpointer) &g_thread_pool_new;
49
50/* Here all unused threads are waiting  */
51static GAsyncQueue *unused_thread_queue;
52static gint unused_threads = 0;
53static gint max_unused_threads = 0;
54G_LOCK_DEFINE_STATIC (unused_threads);
55
56static GMutex *inform_mutex = NULL;
57static GCond *inform_cond = NULL;
58
59static void     g_thread_pool_free_internal (GRealThreadPool* pool);
60static gpointer g_thread_pool_thread_proxy (gpointer data);
61static void     g_thread_pool_start_thread (GRealThreadPool* pool,
62                                            GError **error);
63static void     g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool);
64
65#define g_thread_should_run(pool, len) \
66  ((pool)->running || (!(pool)->immediate && (len) > 0))
67
68static gpointer
69g_thread_pool_thread_proxy (gpointer data)
70{
71  GRealThreadPool *pool = data;
72  gboolean watcher = FALSE;
73
74  g_async_queue_lock (pool->queue);
75  while (TRUE)
76    {
77      gpointer task;
78      gboolean goto_global_pool = !pool->pool.exclusive;
79      gint len = g_async_queue_length_unlocked (pool->queue);
80     
81      if (g_thread_should_run (pool, len))
82        {
83          if (watcher)
84            {
85              /* This thread is actually not needed here, but it waits
86               * for some time anyway. If during that time a new
87               * request arrives, this saves process
88               * swicthes. Otherwise the thread will go to the global
89               * pool afterwards */
90              GTimeVal end_time;
91              g_get_current_time (&end_time);
92              g_time_val_add (&end_time, G_USEC_PER_SEC / 2); /* 1/2 second */
93              task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
94            }
95          else
96            task = g_async_queue_pop_unlocked (pool->queue);
97         
98          if (task)
99            {
100              watcher = FALSE;
101              if (pool->num_threads > pool->max_threads &&
102                  pool->max_threads != -1)
103                /* We are in fact a superfluous threads, so we go to
104                 * the global pool and just hand the data further to
105                 * the next one waiting in the queue */
106                {
107                  g_async_queue_push_unlocked (pool->queue, task);
108                  goto_global_pool = TRUE;
109                }
110              else if (pool->running || !pool->immediate)
111                {
112                  g_async_queue_unlock (pool->queue);
113                  pool->pool.func (task, pool->pool.user_data);
114                  g_async_queue_lock (pool->queue);
115                }
116            }
117          len = g_async_queue_length_unlocked (pool->queue);
118        }
119
120      if (!g_thread_should_run (pool, len))
121        {
122          g_cond_broadcast (inform_cond);
123          goto_global_pool = TRUE;
124        }
125      else if (len > 0)
126        {
127          /* At this pool there are no threads waiting, but tasks are. */
128          goto_global_pool = FALSE;
129        }
130      else if (len == 0 && !watcher && !pool->pool.exclusive)
131        {
132          /* Here neither threads nor tasks are queued and we didn't
133           * just return from a timed wait. We now wait for a limited
134           * time at this pool for new tasks to avoid costly context
135           * switches. */
136          goto_global_pool = FALSE;
137          watcher = TRUE;
138        }
139
140      if (goto_global_pool)
141        {
142          pool->num_threads--;
143
144          if (!pool->running && !pool->waiting)
145            {
146              if (pool->num_threads == 0)
147                {
148                  g_async_queue_unlock (pool->queue);
149                  g_thread_pool_free_internal (pool);
150                }               
151              else
152                {
153                  if (len == - pool->num_threads)
154                    g_thread_pool_wakeup_and_stop_all (pool);
155
156                  g_async_queue_unlock (pool->queue);
157                }
158            }
159          else
160            g_async_queue_unlock (pool->queue);
161         
162          g_async_queue_lock (unused_thread_queue);
163
164          G_LOCK (unused_threads);
165          if ((unused_threads >= max_unused_threads &&
166               max_unused_threads != -1))
167            {
168              G_UNLOCK (unused_threads);
169              g_async_queue_unlock (unused_thread_queue);
170              /* Stop this thread */
171              return NULL;     
172            }
173          unused_threads++;
174          G_UNLOCK (unused_threads);
175
176          pool = g_async_queue_pop_unlocked (unused_thread_queue);
177
178          G_LOCK (unused_threads);
179          unused_threads--;
180          G_UNLOCK (unused_threads);
181
182          g_async_queue_unlock (unused_thread_queue);
183         
184          if (pool == stop_this_thread_marker)
185            /* Stop this thread */
186            return NULL;
187         
188          g_async_queue_lock (pool->queue);
189
190          /* pool->num_threads++ is not done here, but in
191           * g_thread_pool_start_thread to make the new started thread
192           * known to the pool, before itself can do it. */
193        }
194    }
195  return NULL;
196}
197
198static void
199g_thread_pool_start_thread (GRealThreadPool  *pool,
200                            GError          **error)
201{
202  gboolean success = FALSE;
203 
204  if (pool->num_threads >= pool->max_threads && pool->max_threads != -1)
205    /* Enough threads are already running */
206    return;
207
208  g_async_queue_lock (unused_thread_queue);
209
210  if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
211    {
212      g_async_queue_push_unlocked (unused_thread_queue, pool);
213      success = TRUE;
214    }
215
216  g_async_queue_unlock (unused_thread_queue);
217
218  if (!success)
219    {
220      GError *local_error = NULL;
221      /* No thread was found, we have to start a new one */
222      g_thread_create (g_thread_pool_thread_proxy, pool, FALSE, &local_error);
223     
224      if (local_error)
225        {
226          g_propagate_error (error, local_error);
227          return;
228        }
229    }
230
231  /* See comment in g_thread_pool_thread_proxy as to why this is done
232   * here and not there */
233  pool->num_threads++;
234}
235
236/**
237 * g_thread_pool_new:
238 * @func: a function to execute in the threads of the new thread pool
239 * @user_data: user data that is handed over to @func every time it
240 *   is called
241 * @max_threads: the maximal number of threads to execute concurrently in
242 *   the new thread pool, -1 means no limit
243 * @exclusive: should this thread pool be exclusive?
244 * @error: return location for error
245 *
246 * This function creates a new thread pool.
247 *
248 * Whenever you call g_thread_pool_push(), either a new thread is
249 * created or an unused one is reused. At most @max_threads threads
250 * are running concurrently for this thread pool. @max_threads = -1
251 * allows unlimited threads to be created for this thread pool. The
252 * newly created or reused thread now executes the function @func with
253 * the two arguments. The first one is the parameter to
254 * g_thread_pool_push() and the second one is @user_data.
255 *
256 * The parameter @exclusive determines, whether the thread pool owns
257 * all threads exclusive or whether the threads are shared
258 * globally. If @exclusive is %TRUE, @max_threads threads are started
259 * immediately and they will run exclusively for this thread pool until
260 * it is destroyed by g_thread_pool_free(). If @exclusive is %FALSE,
261 * threads are created, when needed and shared between all
262 * non-exclusive thread pools. This implies that @max_threads may not
263 * be -1 for exclusive thread pools.
264 *
265 * @error can be %NULL to ignore errors, or non-%NULL to report
266 * errors. An error can only occur when @exclusive is set to %TRUE and
267 * not all @max_threads threads could be created.
268 *
269 * Return value: the new #GThreadPool
270 **/
271GThreadPool*
272g_thread_pool_new (GFunc            func,
273                   gpointer         user_data,
274                   gint             max_threads,
275                   gboolean         exclusive,
276                   GError         **error)
277{
278  GRealThreadPool *retval;
279  G_LOCK_DEFINE_STATIC (init);
280
281  g_return_val_if_fail (func, NULL);
282  g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
283  g_return_val_if_fail (max_threads >= -1, NULL);
284  g_return_val_if_fail (g_thread_supported (), NULL);
285
286  retval = g_new (GRealThreadPool, 1);
287
288  retval->pool.func = func;
289  retval->pool.user_data = user_data;
290  retval->pool.exclusive = exclusive;
291  retval->queue = g_async_queue_new ();
292  retval->max_threads = max_threads;
293  retval->num_threads = 0;
294  retval->running = TRUE;
295
296  G_LOCK (init);
297 
298  if (!inform_mutex)
299    {
300      inform_mutex = g_mutex_new ();
301      inform_cond = g_cond_new ();
302      unused_thread_queue = g_async_queue_new ();
303    }
304
305  G_UNLOCK (init);
306
307  if (retval->pool.exclusive)
308    {
309      g_async_queue_lock (retval->queue);
310 
311      while (retval->num_threads < retval->max_threads)
312        {
313          GError *local_error = NULL;
314          g_thread_pool_start_thread (retval, &local_error);
315          if (local_error)
316            {
317              g_propagate_error (error, local_error);
318              break;
319            }
320        }
321
322      g_async_queue_unlock (retval->queue);
323    }
324
325  return (GThreadPool*) retval;
326}
327
328/**
329 * g_thread_pool_push:
330 * @pool: a #GThreadPool
331 * @data: a new task for @pool
332 * @error: return location for error
333 *
334 * Inserts @data into the list of tasks to be executed by @pool. When
335 * the number of currently running threads is lower than the maximal
336 * allowed number of threads, a new thread is started (or reused) with
337 * the properties given to g_thread_pool_new (). Otherwise @data stays
338 * in the queue until a thread in this pool finishes its previous task
339 * and processes @data.
340 *
341 * @error can be %NULL to ignore errors, or non-%NULL to report
342 * errors. An error can only occur when a new thread couldn't be
343 * created. In that case @data is simply appended to the queue of work
344 * to do. 
345 **/
346void
347g_thread_pool_push (GThreadPool     *pool,
348                    gpointer         data,
349                    GError         **error)
350{
351  GRealThreadPool *real = (GRealThreadPool*) pool;
352
353  g_return_if_fail (real);
354
355  g_async_queue_lock (real->queue);
356 
357  if (!real->running)
358    {
359      g_async_queue_unlock (real->queue);
360      g_return_if_fail (real->running);
361    }
362
363  if (g_async_queue_length_unlocked (real->queue) >= 0)
364    /* No thread is waiting in the queue */
365    g_thread_pool_start_thread (real, error);
366
367  g_async_queue_push_unlocked (real->queue, data);
368  g_async_queue_unlock (real->queue);
369}
370
371/**
372 * g_thread_pool_set_max_threads:
373 * @pool: a #GThreadPool
374 * @max_threads: a new maximal number of threads for @pool
375 * @error: return location for error
376 *
377 * Sets the maximal allowed number of threads for @pool. A value of -1
378 * means, that the maximal number of threads is unlimited.
379 *
380 * Setting @max_threads to 0 means stopping all work for @pool. It is
381 * effectively frozen until @max_threads is set to a non-zero value
382 * again.
383 *
384 * A thread is never terminated while calling @func, as supplied by
385 * g_thread_pool_new (). Instead the maximal number of threads only
386 * has effect for the allocation of new threads in g_thread_pool_push().
387 * A new thread is allocated, whenever the number of currently
388 * running threads in @pool is smaller than the maximal number.
389 *
390 * @error can be %NULL to ignore errors, or non-%NULL to report
391 * errors. An error can only occur when a new thread couldn't be
392 * created.
393 **/
394void
395g_thread_pool_set_max_threads (GThreadPool     *pool,
396                               gint             max_threads,
397                               GError         **error)
398{
399  GRealThreadPool *real = (GRealThreadPool*) pool;
400  gint to_start;
401
402  g_return_if_fail (real);
403  g_return_if_fail (real->running);
404  g_return_if_fail (!real->pool.exclusive || max_threads != -1);
405  g_return_if_fail (max_threads >= -1);
406
407  g_async_queue_lock (real->queue);
408
409  real->max_threads = max_threads;
410 
411  if (pool->exclusive)
412    to_start = real->max_threads - real->num_threads;
413  else
414    to_start = g_async_queue_length_unlocked (real->queue);
415 
416  for ( ; to_start > 0; to_start--)
417    {
418      GError *local_error = NULL;
419      g_thread_pool_start_thread (real, &local_error);
420      if (local_error)
421        {
422          g_propagate_error (error, local_error);
423          break;
424        }
425    }
426   
427  g_async_queue_unlock (real->queue);
428}
429
430/**
431 * g_thread_pool_get_max_threads:
432 * @pool: a #GThreadPool
433 *
434 * Returns the maximal number of threads for @pool.
435 *
436 * Return value: the maximal number of threads
437 **/
438gint
439g_thread_pool_get_max_threads (GThreadPool     *pool)
440{
441  GRealThreadPool *real = (GRealThreadPool*) pool;
442  gint retval;
443
444  g_return_val_if_fail (real, 0);
445  g_return_val_if_fail (real->running, 0);
446
447  g_async_queue_lock (real->queue);
448
449  retval = real->max_threads;
450   
451  g_async_queue_unlock (real->queue);
452
453  return retval;
454}
455
456/**
457 * g_thread_pool_get_num_threads:
458 * @pool: a #GThreadPool
459 *
460 * Returns the number of threads currently running in @pool.
461 *
462 * Return value: the number of threads currently running
463 **/
464guint
465g_thread_pool_get_num_threads (GThreadPool     *pool)
466{
467  GRealThreadPool *real = (GRealThreadPool*) pool;
468  guint retval;
469
470  g_return_val_if_fail (real, 0);
471  g_return_val_if_fail (real->running, 0);
472
473  g_async_queue_lock (real->queue);
474
475  retval = real->num_threads;
476   
477  g_async_queue_unlock (real->queue);
478
479  return retval;
480}
481
482/**
483 * g_thread_pool_unprocessed:
484 * @pool: a #GThreadPool
485 *
486 * Returns the number of tasks still unprocessed in @pool.
487 *
488 * Return value: the number of unprocessed tasks
489 **/
490guint
491g_thread_pool_unprocessed (GThreadPool     *pool)
492{
493  GRealThreadPool *real = (GRealThreadPool*) pool;
494  gint unprocessed;
495
496  g_return_val_if_fail (real, 0);
497  g_return_val_if_fail (real->running, 0);
498
499  unprocessed = g_async_queue_length (real->queue);
500
501  return MAX (unprocessed, 0);
502}
503
504/**
505 * g_thread_pool_free:
506 * @pool: a #GThreadPool
507 * @immediate: should @pool shut down immediately?
508 * @wait: should the function wait for all tasks to be finished?
509 *
510 * Frees all resources allocated for @pool.
511 *
512 * If @immediate is %TRUE, no new task is processed for
513 * @pool. Otherwise @pool is not freed before the last task is
514 * processed. Note however, that no thread of this pool is
515 * interrupted, while processing a task. Instead at least all still
516 * running threads can finish their tasks before the @pool is freed.
517 *
518 * If @wait is %TRUE, the functions does not return before all tasks
519 * to be processed (dependent on @immediate, whether all or only the
520 * currently running) are ready. Otherwise the function returns immediately.
521 *
522 * After calling this function @pool must not be used anymore.
523 **/
524void
525g_thread_pool_free (GThreadPool     *pool,
526                    gboolean         immediate,
527                    gboolean         wait)
528{
529  GRealThreadPool *real = (GRealThreadPool*) pool;
530
531  g_return_if_fail (real);
532  g_return_if_fail (real->running);
533  /* It there's no thread allowed here, there is not much sense in
534   * not stopping this pool immediately, when it's not empty */
535  g_return_if_fail (immediate || real->max_threads != 0 ||
536                    g_async_queue_length (real->queue) == 0);
537
538  g_async_queue_lock (real->queue);
539
540  real->running = FALSE;
541  real->immediate = immediate;
542  real->waiting = wait;
543
544  if (wait)
545    {
546      g_mutex_lock (inform_mutex);
547      while (g_async_queue_length_unlocked (real->queue) != -real->num_threads)
548        {
549          g_async_queue_unlock (real->queue);
550          g_cond_wait (inform_cond, inform_mutex);
551          g_async_queue_lock (real->queue);
552        }
553      g_mutex_unlock (inform_mutex);
554    }
555
556  if (g_async_queue_length_unlocked (real->queue) == -real->num_threads)
557    {
558      /* No thread is currently doing something (and nothing is left
559       * to process in the queue) */
560      if (real->num_threads == 0) /* No threads left, we clean up */
561        {
562          g_async_queue_unlock (real->queue);
563          g_thread_pool_free_internal (real);
564          return;
565        }
566
567      g_thread_pool_wakeup_and_stop_all (real);
568    }
569 
570  real->waiting = FALSE; /* The last thread should cleanup the pool */
571  g_async_queue_unlock (real->queue);
572}
573
574static void
575g_thread_pool_free_internal (GRealThreadPool* pool)
576{
577  g_return_if_fail (pool);
578  g_return_if_fail (!pool->running);
579  g_return_if_fail (pool->num_threads == 0);
580
581  g_async_queue_unref (pool->queue);
582
583  g_free (pool);
584}
585
586static void
587g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool)
588{
589  guint i;
590 
591  g_return_if_fail (pool);
592  g_return_if_fail (!pool->running);
593  g_return_if_fail (pool->num_threads != 0);
594  g_return_if_fail (g_async_queue_length_unlocked (pool->queue) ==
595                    -pool->num_threads);
596
597  pool->immediate = TRUE;
598  for (i = 0; i < pool->num_threads; i++)
599    g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1));
600}
601
602/**
603 * g_thread_pool_set_max_unused_threads:
604 * @max_threads: maximal number of unused threads
605 *
606 * Sets the maximal number of unused threads to @max_threads. If
607 * @max_threads is -1, no limit is imposed on the number of unused
608 * threads.
609 **/
610void
611g_thread_pool_set_max_unused_threads (gint max_threads)
612{
613  g_return_if_fail (max_threads >= -1); 
614
615  G_LOCK (unused_threads);
616 
617  max_unused_threads = max_threads;
618
619  if (max_unused_threads < unused_threads && max_unused_threads != -1)
620    {
621      guint i;
622
623      g_async_queue_lock (unused_thread_queue);
624      for (i = unused_threads - max_unused_threads; i > 0; i--)
625        g_async_queue_push_unlocked (unused_thread_queue,
626                                     stop_this_thread_marker);
627      g_async_queue_unlock (unused_thread_queue);
628    }
629   
630  G_UNLOCK (unused_threads);
631}
632
633/**
634 * g_thread_pool_get_max_unused_threads:
635 *
636 * Returns the maximal allowed number of unused threads.
637 *
638 * Return value: the maximal number of unused threads
639 **/
640gint
641g_thread_pool_get_max_unused_threads (void)
642{
643  gint retval;
644 
645  G_LOCK (unused_threads);
646  retval = max_unused_threads;
647  G_UNLOCK (unused_threads);
648
649  return retval;
650}
651
652/**
653 * g_thread_pool_get_num_unused_threads:
654 *
655 * Returns the number of currently unused threads.
656 *
657 * Return value: the number of currently unused threads
658 **/
659guint g_thread_pool_get_num_unused_threads (void)
660{
661  guint retval;
662 
663  G_LOCK (unused_threads);
664  retval = unused_threads;
665  G_UNLOCK (unused_threads);
666
667  return retval;
668}
669
670/**
671 * g_thread_pool_stop_unused_threads:
672 *
673 * Stops all currently unused threads. This does not change the
674 * maximal number of unused threads. This function can be used to
675 * regularly stop all unused threads e.g. from g_timeout_add().
676 **/
677void g_thread_pool_stop_unused_threads (void)
678{
679  guint oldval = g_thread_pool_get_max_unused_threads ();
680  g_thread_pool_set_max_unused_threads (0);
681  g_thread_pool_set_max_unused_threads (oldval);
682}
Note: See TracBrowser for help on using the repository browser.