1 | /* GLIB - Library of useful routines for C programming |
---|
2 | * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald |
---|
3 | * |
---|
4 | * GAsyncQueue: thread pool implementation. |
---|
5 | * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe |
---|
6 | * |
---|
7 | * This library is free software; you can redistribute it and/or |
---|
8 | * modify it under the terms of the GNU Lesser General Public |
---|
9 | * License as published by the Free Software Foundation; either |
---|
10 | * version 2 of the License, or (at your option) any later version. |
---|
11 | * |
---|
12 | * This library is distributed in the hope that it will be useful, |
---|
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
---|
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
---|
15 | * Lesser General Public License for more details. |
---|
16 | * |
---|
17 | * You should have received a copy of the GNU Lesser General Public |
---|
18 | * License along with this library; if not, write to the |
---|
19 | * Free Software Foundation, Inc., 59 Temple Place - Suite 330, |
---|
20 | * Boston, MA 02111-1307, USA. |
---|
21 | */ |
---|
22 | |
---|
23 | /* |
---|
24 | * MT safe |
---|
25 | */ |
---|
26 | |
---|
27 | #include "config.h" |
---|
28 | |
---|
29 | #include "glib.h" |
---|
30 | |
---|
31 | |
---|
32 | typedef struct _GRealThreadPool GRealThreadPool; |
---|
33 | |
---|
34 | struct _GRealThreadPool |
---|
35 | { |
---|
36 | GThreadPool pool; |
---|
37 | GAsyncQueue* queue; |
---|
38 | gint max_threads; |
---|
39 | gint num_threads; |
---|
40 | gboolean running; |
---|
41 | gboolean immediate; |
---|
42 | gboolean waiting; |
---|
43 | }; |
---|
44 | |
---|
45 | /* The following is just an address to mark the stop order for a |
---|
46 | * thread, it could be any address (as long, as it isn't a valid |
---|
47 | * GThreadPool address) */ |
---|
48 | static const gpointer stop_this_thread_marker = (gpointer) &g_thread_pool_new; |
---|
49 | |
---|
50 | /* Here all unused threads are waiting */ |
---|
51 | static GAsyncQueue *unused_thread_queue; |
---|
52 | static gint unused_threads = 0; |
---|
53 | static gint max_unused_threads = 0; |
---|
54 | G_LOCK_DEFINE_STATIC (unused_threads); |
---|
55 | |
---|
56 | static GMutex *inform_mutex = NULL; |
---|
57 | static GCond *inform_cond = NULL; |
---|
58 | |
---|
59 | static void g_thread_pool_free_internal (GRealThreadPool* pool); |
---|
60 | static gpointer g_thread_pool_thread_proxy (gpointer data); |
---|
61 | static void g_thread_pool_start_thread (GRealThreadPool* pool, |
---|
62 | GError **error); |
---|
63 | static void g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool); |
---|
64 | |
---|
65 | #define g_thread_should_run(pool, len) \ |
---|
66 | ((pool)->running || (!(pool)->immediate && (len) > 0)) |
---|
67 | |
---|
68 | static gpointer |
---|
69 | g_thread_pool_thread_proxy (gpointer data) |
---|
70 | { |
---|
71 | GRealThreadPool *pool = data; |
---|
72 | gboolean watcher = FALSE; |
---|
73 | |
---|
74 | g_async_queue_lock (pool->queue); |
---|
75 | while (TRUE) |
---|
76 | { |
---|
77 | gpointer task; |
---|
78 | gboolean goto_global_pool = !pool->pool.exclusive; |
---|
79 | gint len = g_async_queue_length_unlocked (pool->queue); |
---|
80 | |
---|
81 | if (g_thread_should_run (pool, len)) |
---|
82 | { |
---|
83 | if (watcher) |
---|
84 | { |
---|
85 | /* This thread is actually not needed here, but it waits |
---|
86 | * for some time anyway. If during that time a new |
---|
87 | * request arrives, this saves process |
---|
88 | * swicthes. Otherwise the thread will go to the global |
---|
89 | * pool afterwards */ |
---|
90 | GTimeVal end_time; |
---|
91 | g_get_current_time (&end_time); |
---|
92 | g_time_val_add (&end_time, G_USEC_PER_SEC / 2); /* 1/2 second */ |
---|
93 | task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time); |
---|
94 | } |
---|
95 | else |
---|
96 | task = g_async_queue_pop_unlocked (pool->queue); |
---|
97 | |
---|
98 | if (task) |
---|
99 | { |
---|
100 | watcher = FALSE; |
---|
101 | if (pool->num_threads > pool->max_threads && |
---|
102 | pool->max_threads != -1) |
---|
103 | /* We are in fact a superfluous threads, so we go to |
---|
104 | * the global pool and just hand the data further to |
---|
105 | * the next one waiting in the queue */ |
---|
106 | { |
---|
107 | g_async_queue_push_unlocked (pool->queue, task); |
---|
108 | goto_global_pool = TRUE; |
---|
109 | } |
---|
110 | else if (pool->running || !pool->immediate) |
---|
111 | { |
---|
112 | g_async_queue_unlock (pool->queue); |
---|
113 | pool->pool.func (task, pool->pool.user_data); |
---|
114 | g_async_queue_lock (pool->queue); |
---|
115 | } |
---|
116 | } |
---|
117 | len = g_async_queue_length_unlocked (pool->queue); |
---|
118 | } |
---|
119 | |
---|
120 | if (!g_thread_should_run (pool, len)) |
---|
121 | { |
---|
122 | g_cond_broadcast (inform_cond); |
---|
123 | goto_global_pool = TRUE; |
---|
124 | } |
---|
125 | else if (len > 0) |
---|
126 | { |
---|
127 | /* At this pool there are no threads waiting, but tasks are. */ |
---|
128 | goto_global_pool = FALSE; |
---|
129 | } |
---|
130 | else if (len == 0 && !watcher && !pool->pool.exclusive) |
---|
131 | { |
---|
132 | /* Here neither threads nor tasks are queued and we didn't |
---|
133 | * just return from a timed wait. We now wait for a limited |
---|
134 | * time at this pool for new tasks to avoid costly context |
---|
135 | * switches. */ |
---|
136 | goto_global_pool = FALSE; |
---|
137 | watcher = TRUE; |
---|
138 | } |
---|
139 | |
---|
140 | if (goto_global_pool) |
---|
141 | { |
---|
142 | pool->num_threads--; |
---|
143 | |
---|
144 | if (!pool->running && !pool->waiting) |
---|
145 | { |
---|
146 | if (pool->num_threads == 0) |
---|
147 | { |
---|
148 | g_async_queue_unlock (pool->queue); |
---|
149 | g_thread_pool_free_internal (pool); |
---|
150 | } |
---|
151 | else |
---|
152 | { |
---|
153 | if (len == - pool->num_threads) |
---|
154 | g_thread_pool_wakeup_and_stop_all (pool); |
---|
155 | |
---|
156 | g_async_queue_unlock (pool->queue); |
---|
157 | } |
---|
158 | } |
---|
159 | else |
---|
160 | g_async_queue_unlock (pool->queue); |
---|
161 | |
---|
162 | g_async_queue_lock (unused_thread_queue); |
---|
163 | |
---|
164 | G_LOCK (unused_threads); |
---|
165 | if ((unused_threads >= max_unused_threads && |
---|
166 | max_unused_threads != -1)) |
---|
167 | { |
---|
168 | G_UNLOCK (unused_threads); |
---|
169 | g_async_queue_unlock (unused_thread_queue); |
---|
170 | /* Stop this thread */ |
---|
171 | return NULL; |
---|
172 | } |
---|
173 | unused_threads++; |
---|
174 | G_UNLOCK (unused_threads); |
---|
175 | |
---|
176 | pool = g_async_queue_pop_unlocked (unused_thread_queue); |
---|
177 | |
---|
178 | G_LOCK (unused_threads); |
---|
179 | unused_threads--; |
---|
180 | G_UNLOCK (unused_threads); |
---|
181 | |
---|
182 | g_async_queue_unlock (unused_thread_queue); |
---|
183 | |
---|
184 | if (pool == stop_this_thread_marker) |
---|
185 | /* Stop this thread */ |
---|
186 | return NULL; |
---|
187 | |
---|
188 | g_async_queue_lock (pool->queue); |
---|
189 | |
---|
190 | /* pool->num_threads++ is not done here, but in |
---|
191 | * g_thread_pool_start_thread to make the new started thread |
---|
192 | * known to the pool, before itself can do it. */ |
---|
193 | } |
---|
194 | } |
---|
195 | return NULL; |
---|
196 | } |
---|
197 | |
---|
198 | static void |
---|
199 | g_thread_pool_start_thread (GRealThreadPool *pool, |
---|
200 | GError **error) |
---|
201 | { |
---|
202 | gboolean success = FALSE; |
---|
203 | |
---|
204 | if (pool->num_threads >= pool->max_threads && pool->max_threads != -1) |
---|
205 | /* Enough threads are already running */ |
---|
206 | return; |
---|
207 | |
---|
208 | g_async_queue_lock (unused_thread_queue); |
---|
209 | |
---|
210 | if (g_async_queue_length_unlocked (unused_thread_queue) < 0) |
---|
211 | { |
---|
212 | g_async_queue_push_unlocked (unused_thread_queue, pool); |
---|
213 | success = TRUE; |
---|
214 | } |
---|
215 | |
---|
216 | g_async_queue_unlock (unused_thread_queue); |
---|
217 | |
---|
218 | if (!success) |
---|
219 | { |
---|
220 | GError *local_error = NULL; |
---|
221 | /* No thread was found, we have to start a new one */ |
---|
222 | g_thread_create (g_thread_pool_thread_proxy, pool, FALSE, &local_error); |
---|
223 | |
---|
224 | if (local_error) |
---|
225 | { |
---|
226 | g_propagate_error (error, local_error); |
---|
227 | return; |
---|
228 | } |
---|
229 | } |
---|
230 | |
---|
231 | /* See comment in g_thread_pool_thread_proxy as to why this is done |
---|
232 | * here and not there */ |
---|
233 | pool->num_threads++; |
---|
234 | } |
---|
235 | |
---|
236 | /** |
---|
237 | * g_thread_pool_new: |
---|
238 | * @func: a function to execute in the threads of the new thread pool |
---|
239 | * @user_data: user data that is handed over to @func every time it |
---|
240 | * is called |
---|
241 | * @max_threads: the maximal number of threads to execute concurrently in |
---|
242 | * the new thread pool, -1 means no limit |
---|
243 | * @exclusive: should this thread pool be exclusive? |
---|
244 | * @error: return location for error |
---|
245 | * |
---|
246 | * This function creates a new thread pool. |
---|
247 | * |
---|
248 | * Whenever you call g_thread_pool_push(), either a new thread is |
---|
249 | * created or an unused one is reused. At most @max_threads threads |
---|
250 | * are running concurrently for this thread pool. @max_threads = -1 |
---|
251 | * allows unlimited threads to be created for this thread pool. The |
---|
252 | * newly created or reused thread now executes the function @func with |
---|
253 | * the two arguments. The first one is the parameter to |
---|
254 | * g_thread_pool_push() and the second one is @user_data. |
---|
255 | * |
---|
256 | * The parameter @exclusive determines, whether the thread pool owns |
---|
257 | * all threads exclusive or whether the threads are shared |
---|
258 | * globally. If @exclusive is %TRUE, @max_threads threads are started |
---|
259 | * immediately and they will run exclusively for this thread pool until |
---|
260 | * it is destroyed by g_thread_pool_free(). If @exclusive is %FALSE, |
---|
261 | * threads are created, when needed and shared between all |
---|
262 | * non-exclusive thread pools. This implies that @max_threads may not |
---|
263 | * be -1 for exclusive thread pools. |
---|
264 | * |
---|
265 | * @error can be %NULL to ignore errors, or non-%NULL to report |
---|
266 | * errors. An error can only occur when @exclusive is set to %TRUE and |
---|
267 | * not all @max_threads threads could be created. |
---|
268 | * |
---|
269 | * Return value: the new #GThreadPool |
---|
270 | **/ |
---|
271 | GThreadPool* |
---|
272 | g_thread_pool_new (GFunc func, |
---|
273 | gpointer user_data, |
---|
274 | gint max_threads, |
---|
275 | gboolean exclusive, |
---|
276 | GError **error) |
---|
277 | { |
---|
278 | GRealThreadPool *retval; |
---|
279 | G_LOCK_DEFINE_STATIC (init); |
---|
280 | |
---|
281 | g_return_val_if_fail (func, NULL); |
---|
282 | g_return_val_if_fail (!exclusive || max_threads != -1, NULL); |
---|
283 | g_return_val_if_fail (max_threads >= -1, NULL); |
---|
284 | g_return_val_if_fail (g_thread_supported (), NULL); |
---|
285 | |
---|
286 | retval = g_new (GRealThreadPool, 1); |
---|
287 | |
---|
288 | retval->pool.func = func; |
---|
289 | retval->pool.user_data = user_data; |
---|
290 | retval->pool.exclusive = exclusive; |
---|
291 | retval->queue = g_async_queue_new (); |
---|
292 | retval->max_threads = max_threads; |
---|
293 | retval->num_threads = 0; |
---|
294 | retval->running = TRUE; |
---|
295 | |
---|
296 | G_LOCK (init); |
---|
297 | |
---|
298 | if (!inform_mutex) |
---|
299 | { |
---|
300 | inform_mutex = g_mutex_new (); |
---|
301 | inform_cond = g_cond_new (); |
---|
302 | unused_thread_queue = g_async_queue_new (); |
---|
303 | } |
---|
304 | |
---|
305 | G_UNLOCK (init); |
---|
306 | |
---|
307 | if (retval->pool.exclusive) |
---|
308 | { |
---|
309 | g_async_queue_lock (retval->queue); |
---|
310 | |
---|
311 | while (retval->num_threads < retval->max_threads) |
---|
312 | { |
---|
313 | GError *local_error = NULL; |
---|
314 | g_thread_pool_start_thread (retval, &local_error); |
---|
315 | if (local_error) |
---|
316 | { |
---|
317 | g_propagate_error (error, local_error); |
---|
318 | break; |
---|
319 | } |
---|
320 | } |
---|
321 | |
---|
322 | g_async_queue_unlock (retval->queue); |
---|
323 | } |
---|
324 | |
---|
325 | return (GThreadPool*) retval; |
---|
326 | } |
---|
327 | |
---|
328 | /** |
---|
329 | * g_thread_pool_push: |
---|
330 | * @pool: a #GThreadPool |
---|
331 | * @data: a new task for @pool |
---|
332 | * @error: return location for error |
---|
333 | * |
---|
334 | * Inserts @data into the list of tasks to be executed by @pool. When |
---|
335 | * the number of currently running threads is lower than the maximal |
---|
336 | * allowed number of threads, a new thread is started (or reused) with |
---|
337 | * the properties given to g_thread_pool_new (). Otherwise @data stays |
---|
338 | * in the queue until a thread in this pool finishes its previous task |
---|
339 | * and processes @data. |
---|
340 | * |
---|
341 | * @error can be %NULL to ignore errors, or non-%NULL to report |
---|
342 | * errors. An error can only occur when a new thread couldn't be |
---|
343 | * created. In that case @data is simply appended to the queue of work |
---|
344 | * to do. |
---|
345 | **/ |
---|
346 | void |
---|
347 | g_thread_pool_push (GThreadPool *pool, |
---|
348 | gpointer data, |
---|
349 | GError **error) |
---|
350 | { |
---|
351 | GRealThreadPool *real = (GRealThreadPool*) pool; |
---|
352 | |
---|
353 | g_return_if_fail (real); |
---|
354 | |
---|
355 | g_async_queue_lock (real->queue); |
---|
356 | |
---|
357 | if (!real->running) |
---|
358 | { |
---|
359 | g_async_queue_unlock (real->queue); |
---|
360 | g_return_if_fail (real->running); |
---|
361 | } |
---|
362 | |
---|
363 | if (g_async_queue_length_unlocked (real->queue) >= 0) |
---|
364 | /* No thread is waiting in the queue */ |
---|
365 | g_thread_pool_start_thread (real, error); |
---|
366 | |
---|
367 | g_async_queue_push_unlocked (real->queue, data); |
---|
368 | g_async_queue_unlock (real->queue); |
---|
369 | } |
---|
370 | |
---|
371 | /** |
---|
372 | * g_thread_pool_set_max_threads: |
---|
373 | * @pool: a #GThreadPool |
---|
374 | * @max_threads: a new maximal number of threads for @pool |
---|
375 | * @error: return location for error |
---|
376 | * |
---|
377 | * Sets the maximal allowed number of threads for @pool. A value of -1 |
---|
378 | * means, that the maximal number of threads is unlimited. |
---|
379 | * |
---|
380 | * Setting @max_threads to 0 means stopping all work for @pool. It is |
---|
381 | * effectively frozen until @max_threads is set to a non-zero value |
---|
382 | * again. |
---|
383 | * |
---|
384 | * A thread is never terminated while calling @func, as supplied by |
---|
385 | * g_thread_pool_new (). Instead the maximal number of threads only |
---|
386 | * has effect for the allocation of new threads in g_thread_pool_push(). |
---|
387 | * A new thread is allocated, whenever the number of currently |
---|
388 | * running threads in @pool is smaller than the maximal number. |
---|
389 | * |
---|
390 | * @error can be %NULL to ignore errors, or non-%NULL to report |
---|
391 | * errors. An error can only occur when a new thread couldn't be |
---|
392 | * created. |
---|
393 | **/ |
---|
394 | void |
---|
395 | g_thread_pool_set_max_threads (GThreadPool *pool, |
---|
396 | gint max_threads, |
---|
397 | GError **error) |
---|
398 | { |
---|
399 | GRealThreadPool *real = (GRealThreadPool*) pool; |
---|
400 | gint to_start; |
---|
401 | |
---|
402 | g_return_if_fail (real); |
---|
403 | g_return_if_fail (real->running); |
---|
404 | g_return_if_fail (!real->pool.exclusive || max_threads != -1); |
---|
405 | g_return_if_fail (max_threads >= -1); |
---|
406 | |
---|
407 | g_async_queue_lock (real->queue); |
---|
408 | |
---|
409 | real->max_threads = max_threads; |
---|
410 | |
---|
411 | if (pool->exclusive) |
---|
412 | to_start = real->max_threads - real->num_threads; |
---|
413 | else |
---|
414 | to_start = g_async_queue_length_unlocked (real->queue); |
---|
415 | |
---|
416 | for ( ; to_start > 0; to_start--) |
---|
417 | { |
---|
418 | GError *local_error = NULL; |
---|
419 | g_thread_pool_start_thread (real, &local_error); |
---|
420 | if (local_error) |
---|
421 | { |
---|
422 | g_propagate_error (error, local_error); |
---|
423 | break; |
---|
424 | } |
---|
425 | } |
---|
426 | |
---|
427 | g_async_queue_unlock (real->queue); |
---|
428 | } |
---|
429 | |
---|
430 | /** |
---|
431 | * g_thread_pool_get_max_threads: |
---|
432 | * @pool: a #GThreadPool |
---|
433 | * |
---|
434 | * Returns the maximal number of threads for @pool. |
---|
435 | * |
---|
436 | * Return value: the maximal number of threads |
---|
437 | **/ |
---|
438 | gint |
---|
439 | g_thread_pool_get_max_threads (GThreadPool *pool) |
---|
440 | { |
---|
441 | GRealThreadPool *real = (GRealThreadPool*) pool; |
---|
442 | gint retval; |
---|
443 | |
---|
444 | g_return_val_if_fail (real, 0); |
---|
445 | g_return_val_if_fail (real->running, 0); |
---|
446 | |
---|
447 | g_async_queue_lock (real->queue); |
---|
448 | |
---|
449 | retval = real->max_threads; |
---|
450 | |
---|
451 | g_async_queue_unlock (real->queue); |
---|
452 | |
---|
453 | return retval; |
---|
454 | } |
---|
455 | |
---|
456 | /** |
---|
457 | * g_thread_pool_get_num_threads: |
---|
458 | * @pool: a #GThreadPool |
---|
459 | * |
---|
460 | * Returns the number of threads currently running in @pool. |
---|
461 | * |
---|
462 | * Return value: the number of threads currently running |
---|
463 | **/ |
---|
464 | guint |
---|
465 | g_thread_pool_get_num_threads (GThreadPool *pool) |
---|
466 | { |
---|
467 | GRealThreadPool *real = (GRealThreadPool*) pool; |
---|
468 | guint retval; |
---|
469 | |
---|
470 | g_return_val_if_fail (real, 0); |
---|
471 | g_return_val_if_fail (real->running, 0); |
---|
472 | |
---|
473 | g_async_queue_lock (real->queue); |
---|
474 | |
---|
475 | retval = real->num_threads; |
---|
476 | |
---|
477 | g_async_queue_unlock (real->queue); |
---|
478 | |
---|
479 | return retval; |
---|
480 | } |
---|
481 | |
---|
482 | /** |
---|
483 | * g_thread_pool_unprocessed: |
---|
484 | * @pool: a #GThreadPool |
---|
485 | * |
---|
486 | * Returns the number of tasks still unprocessed in @pool. |
---|
487 | * |
---|
488 | * Return value: the number of unprocessed tasks |
---|
489 | **/ |
---|
490 | guint |
---|
491 | g_thread_pool_unprocessed (GThreadPool *pool) |
---|
492 | { |
---|
493 | GRealThreadPool *real = (GRealThreadPool*) pool; |
---|
494 | gint unprocessed; |
---|
495 | |
---|
496 | g_return_val_if_fail (real, 0); |
---|
497 | g_return_val_if_fail (real->running, 0); |
---|
498 | |
---|
499 | unprocessed = g_async_queue_length (real->queue); |
---|
500 | |
---|
501 | return MAX (unprocessed, 0); |
---|
502 | } |
---|
503 | |
---|
504 | /** |
---|
505 | * g_thread_pool_free: |
---|
506 | * @pool: a #GThreadPool |
---|
507 | * @immediate: should @pool shut down immediately? |
---|
508 | * @wait: should the function wait for all tasks to be finished? |
---|
509 | * |
---|
510 | * Frees all resources allocated for @pool. |
---|
511 | * |
---|
512 | * If @immediate is %TRUE, no new task is processed for |
---|
513 | * @pool. Otherwise @pool is not freed before the last task is |
---|
514 | * processed. Note however, that no thread of this pool is |
---|
515 | * interrupted, while processing a task. Instead at least all still |
---|
516 | * running threads can finish their tasks before the @pool is freed. |
---|
517 | * |
---|
518 | * If @wait is %TRUE, the functions does not return before all tasks |
---|
519 | * to be processed (dependent on @immediate, whether all or only the |
---|
520 | * currently running) are ready. Otherwise the function returns immediately. |
---|
521 | * |
---|
522 | * After calling this function @pool must not be used anymore. |
---|
523 | **/ |
---|
524 | void |
---|
525 | g_thread_pool_free (GThreadPool *pool, |
---|
526 | gboolean immediate, |
---|
527 | gboolean wait) |
---|
528 | { |
---|
529 | GRealThreadPool *real = (GRealThreadPool*) pool; |
---|
530 | |
---|
531 | g_return_if_fail (real); |
---|
532 | g_return_if_fail (real->running); |
---|
533 | /* It there's no thread allowed here, there is not much sense in |
---|
534 | * not stopping this pool immediately, when it's not empty */ |
---|
535 | g_return_if_fail (immediate || real->max_threads != 0 || |
---|
536 | g_async_queue_length (real->queue) == 0); |
---|
537 | |
---|
538 | g_async_queue_lock (real->queue); |
---|
539 | |
---|
540 | real->running = FALSE; |
---|
541 | real->immediate = immediate; |
---|
542 | real->waiting = wait; |
---|
543 | |
---|
544 | if (wait) |
---|
545 | { |
---|
546 | g_mutex_lock (inform_mutex); |
---|
547 | while (g_async_queue_length_unlocked (real->queue) != -real->num_threads) |
---|
548 | { |
---|
549 | g_async_queue_unlock (real->queue); |
---|
550 | g_cond_wait (inform_cond, inform_mutex); |
---|
551 | g_async_queue_lock (real->queue); |
---|
552 | } |
---|
553 | g_mutex_unlock (inform_mutex); |
---|
554 | } |
---|
555 | |
---|
556 | if (g_async_queue_length_unlocked (real->queue) == -real->num_threads) |
---|
557 | { |
---|
558 | /* No thread is currently doing something (and nothing is left |
---|
559 | * to process in the queue) */ |
---|
560 | if (real->num_threads == 0) /* No threads left, we clean up */ |
---|
561 | { |
---|
562 | g_async_queue_unlock (real->queue); |
---|
563 | g_thread_pool_free_internal (real); |
---|
564 | return; |
---|
565 | } |
---|
566 | |
---|
567 | g_thread_pool_wakeup_and_stop_all (real); |
---|
568 | } |
---|
569 | |
---|
570 | real->waiting = FALSE; /* The last thread should cleanup the pool */ |
---|
571 | g_async_queue_unlock (real->queue); |
---|
572 | } |
---|
573 | |
---|
574 | static void |
---|
575 | g_thread_pool_free_internal (GRealThreadPool* pool) |
---|
576 | { |
---|
577 | g_return_if_fail (pool); |
---|
578 | g_return_if_fail (!pool->running); |
---|
579 | g_return_if_fail (pool->num_threads == 0); |
---|
580 | |
---|
581 | g_async_queue_unref (pool->queue); |
---|
582 | |
---|
583 | g_free (pool); |
---|
584 | } |
---|
585 | |
---|
586 | static void |
---|
587 | g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool) |
---|
588 | { |
---|
589 | guint i; |
---|
590 | |
---|
591 | g_return_if_fail (pool); |
---|
592 | g_return_if_fail (!pool->running); |
---|
593 | g_return_if_fail (pool->num_threads != 0); |
---|
594 | g_return_if_fail (g_async_queue_length_unlocked (pool->queue) == |
---|
595 | -pool->num_threads); |
---|
596 | |
---|
597 | pool->immediate = TRUE; |
---|
598 | for (i = 0; i < pool->num_threads; i++) |
---|
599 | g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1)); |
---|
600 | } |
---|
601 | |
---|
602 | /** |
---|
603 | * g_thread_pool_set_max_unused_threads: |
---|
604 | * @max_threads: maximal number of unused threads |
---|
605 | * |
---|
606 | * Sets the maximal number of unused threads to @max_threads. If |
---|
607 | * @max_threads is -1, no limit is imposed on the number of unused |
---|
608 | * threads. |
---|
609 | **/ |
---|
610 | void |
---|
611 | g_thread_pool_set_max_unused_threads (gint max_threads) |
---|
612 | { |
---|
613 | g_return_if_fail (max_threads >= -1); |
---|
614 | |
---|
615 | G_LOCK (unused_threads); |
---|
616 | |
---|
617 | max_unused_threads = max_threads; |
---|
618 | |
---|
619 | if (max_unused_threads < unused_threads && max_unused_threads != -1) |
---|
620 | { |
---|
621 | guint i; |
---|
622 | |
---|
623 | g_async_queue_lock (unused_thread_queue); |
---|
624 | for (i = unused_threads - max_unused_threads; i > 0; i--) |
---|
625 | g_async_queue_push_unlocked (unused_thread_queue, |
---|
626 | stop_this_thread_marker); |
---|
627 | g_async_queue_unlock (unused_thread_queue); |
---|
628 | } |
---|
629 | |
---|
630 | G_UNLOCK (unused_threads); |
---|
631 | } |
---|
632 | |
---|
633 | /** |
---|
634 | * g_thread_pool_get_max_unused_threads: |
---|
635 | * |
---|
636 | * Returns the maximal allowed number of unused threads. |
---|
637 | * |
---|
638 | * Return value: the maximal number of unused threads |
---|
639 | **/ |
---|
640 | gint |
---|
641 | g_thread_pool_get_max_unused_threads (void) |
---|
642 | { |
---|
643 | gint retval; |
---|
644 | |
---|
645 | G_LOCK (unused_threads); |
---|
646 | retval = max_unused_threads; |
---|
647 | G_UNLOCK (unused_threads); |
---|
648 | |
---|
649 | return retval; |
---|
650 | } |
---|
651 | |
---|
652 | /** |
---|
653 | * g_thread_pool_get_num_unused_threads: |
---|
654 | * |
---|
655 | * Returns the number of currently unused threads. |
---|
656 | * |
---|
657 | * Return value: the number of currently unused threads |
---|
658 | **/ |
---|
659 | guint g_thread_pool_get_num_unused_threads (void) |
---|
660 | { |
---|
661 | guint retval; |
---|
662 | |
---|
663 | G_LOCK (unused_threads); |
---|
664 | retval = unused_threads; |
---|
665 | G_UNLOCK (unused_threads); |
---|
666 | |
---|
667 | return retval; |
---|
668 | } |
---|
669 | |
---|
670 | /** |
---|
671 | * g_thread_pool_stop_unused_threads: |
---|
672 | * |
---|
673 | * Stops all currently unused threads. This does not change the |
---|
674 | * maximal number of unused threads. This function can be used to |
---|
675 | * regularly stop all unused threads e.g. from g_timeout_add(). |
---|
676 | **/ |
---|
677 | void g_thread_pool_stop_unused_threads (void) |
---|
678 | { |
---|
679 | guint oldval = g_thread_pool_get_max_unused_threads (); |
---|
680 | g_thread_pool_set_max_unused_threads (0); |
---|
681 | g_thread_pool_set_max_unused_threads (oldval); |
---|
682 | } |
---|