http://blog.csdn.net/phus/archive/2005/06/09/390745.aspx
S63fbaiducuk5COthrmgr.h文件
/*
S63fbaiducuk5CO* Copyright (C) 2004 Trog
S63fbaiducuk5CO*
S63fbaiducuk5CO* This program is free software; you can redistribute it and/or mod
y
S63fbaiducuk5CO* it under the terms of the GNU General Public License as published by
S63fbaiducuk5CO* the Free Software Foundation; either version 2 of the License, or
S63fbaiducuk5CO* (at your option) any later version.
S63fbaiducuk5CO*
S63fbaiducuk5CO* This program is distributed in the hope that it will be useful,
S63fbaiducuk5CO* but WITHOUT ANY WARRANTY; without even the implied warranty of
S63fbaiducuk5CO* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
S63fbaiducuk5CO* GNU General Public License for more details.
S63fbaiducuk5CO*
S63fbaiducuk5CO* You should have received a copy of the GNU General Public License
S63fbaiducuk5CO* along with this program;
not, write to the Free Software
S63fbaiducuk5CO* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
S63fbaiducuk5CO*/
#
ndef __THRMGR_H__
S63fbaiducuk5CO#
__THRMGR_H__
#
S63fbaiducuk5CO#
typedef struct work_item_tag {
S63fbaiducuk5COstruct work_item_tag *next;
S63fbaiducuk5COvoid *data;
S63fbaiducuk5COstruct timeval time_queued;
S63fbaiducuk5CO} work_item_t;
S63fbaiducuk5CO
S63fbaiducuk5COtypedef struct work_queue_tag {
S63fbaiducuk5COwork_item_t *head;
S63fbaiducuk5COwork_item_t *tail;
S63fbaiducuk5CO
item_count;
S63fbaiducuk5CO} work_queue_t;
typedef enum {
S63fbaiducuk5COPOOL_INVALID,
S63fbaiducuk5COPOOL_VALID,
S63fbaiducuk5COPOOL_EXIT,
S63fbaiducuk5CO} pool_state_t;
typedef struct threadpool_tag {
S63fbaiducuk5COpthread_mutex_t pool_mutex;
S63fbaiducuk5COpthread_cond_t pool_cond;
S63fbaiducuk5COpthread_attr_t pool_attr;
S63fbaiducuk5CO
S63fbaiducuk5COpool_state_t state;
S63fbaiducuk5CO
thr_max;
S63fbaiducuk5CO
thr_alive;
S63fbaiducuk5CO
thr_idle;
S63fbaiducuk5CO
idle_timeout;
S63fbaiducuk5CO
S63fbaiducuk5COvoid (*handler)(void *);
S63fbaiducuk5CO
S63fbaiducuk5COwork_queue_t *queue;
S63fbaiducuk5CO} threadpool_t;
threadpool_t *thrmgr_
(
max_threads,
idle_timeout, void (*handler)(void *));
S63fbaiducuk5COvoid thrmgr_destroy(threadpool_t *threadpool);
S63fbaiducuk5CO
thrmgr_dispatch(threadpool_t *threadpool, void *user_data);
#end![](/icons/91168if.gif)
S63fbaiducuk5COthrmgr.c文件
/*
S63fbaiducuk5CO* Copyright (C) 2004 Trog
S63fbaiducuk5CO*
S63fbaiducuk5CO* This program is free software; you can redistribute it and/or mod
y
S63fbaiducuk5CO* it under the terms of the GNU General Public License as published by
S63fbaiducuk5CO* the Free Software Foundation; either version 2 of the License, or
S63fbaiducuk5CO* (at your option) any later version.
S63fbaiducuk5CO*
S63fbaiducuk5CO* This program is distributed in the hope that it will be useful,
S63fbaiducuk5CO* but WITHOUT ANY WARRANTY; without even the implied warranty of
S63fbaiducuk5CO* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
S63fbaiducuk5CO* GNU General Public License for more details.
S63fbaiducuk5CO*
S63fbaiducuk5CO* You should have received a copy of the GNU General Public License
S63fbaiducuk5CO* along with this program;
not, write to the Free Software
S63fbaiducuk5CO* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
S63fbaiducuk5CO*/
#
S63fbaiducuk5CO#
S63fbaiducuk5CO#
#
"thrmgr.h"
#
"others.h"
S63fbaiducuk5CO#
"memory.h"
S63fbaiducuk5CO#
"output.h"
#
FALSE (0)
S63fbaiducuk5CO#
TRUE (1)
work_queue_t *work_queue_![](/icons/91168new.gif)
![](/icons/91168kh.gif)
S63fbaiducuk5CO{
S63fbaiducuk5COwork_queue_t *work_q;
S63fbaiducuk5CO
S63fbaiducuk5COwork_q = (work_queue_t *) mmalloc(
(work_queue_t));
S63fbaiducuk5CO
S63fbaiducuk5COwork_q->head = work_q->tail = NULL;
S63fbaiducuk5COwork_q->item_count = 0;
S63fbaiducuk5CO
work_q;
S63fbaiducuk5CO}
void work_queue_add(work_queue_t *work_q, void *data)
S63fbaiducuk5CO{
S63fbaiducuk5COwork_item_t *work_item;
S63fbaiducuk5CO
S63fbaiducuk5CO
(!work_q) {
S63fbaiducuk5CO
;
S63fbaiducuk5CO}
S63fbaiducuk5COwork_item = (work_item_t *) mmalloc(
(work_item_t));
S63fbaiducuk5COwork_item->next = NULL;
S63fbaiducuk5COwork_item->data = data;
S63fbaiducuk5COgettimeofday(&(work_item->time_queued), NULL);
S63fbaiducuk5CO
S63fbaiducuk5CO
(work_q->head
NULL) {
S63fbaiducuk5COwork_q->head = work_q->tail = work_item;
S63fbaiducuk5COwork_q->item_count = 1;
S63fbaiducuk5CO}
{
S63fbaiducuk5COwork_q->tail->next = work_item;
S63fbaiducuk5COwork_q->tail = work_item;
S63fbaiducuk5COwork_q->item_count
;
S63fbaiducuk5CO}
S63fbaiducuk5CO
;
S63fbaiducuk5CO}
void *work_queue_pop(work_queue_t *work_q)
S63fbaiducuk5CO{
S63fbaiducuk5COwork_item_t *work_item;
S63fbaiducuk5COvoid *data;
S63fbaiducuk5CO
S63fbaiducuk5CO
(!work_q || !work_q->head) {
S63fbaiducuk5CO
NULL;
S63fbaiducuk5CO}
S63fbaiducuk5COwork_item = work_q->head;
S63fbaiducuk5COdata = work_item->data;
S63fbaiducuk5COwork_q->head = work_item->next;
S63fbaiducuk5CO
(work_q->head
NULL) {
S63fbaiducuk5COwork_q->tail = NULL;
S63fbaiducuk5CO}
S63fbaiducuk5COfree(work_item);
S63fbaiducuk5CO
data;
S63fbaiducuk5CO}
void thrmgr_destroy(threadpool_t *threadpool)
S63fbaiducuk5CO{
S63fbaiducuk5CO
(!threadpool || (threadpool->state != POOL_VALID)) {
S63fbaiducuk5CO
;
S63fbaiducuk5CO}
S63fbaiducuk5CO
(pthread_mutex_lock(&threadpool->pool_mutex) != 0) {
S63fbaiducuk5COlogg("!Mutex lock failed\n");
S63fbaiducuk5COexit(-1);
S63fbaiducuk5CO}
S63fbaiducuk5COthreadpool->state = POOL_EXIT;
S63fbaiducuk5CO
S63fbaiducuk5CO/* wait for threads to exit */
S63fbaiducuk5CO
(threadpool->thr_alive > 0) {
/*通知兄弟们收工*/
S63fbaiducuk5CO
(pthread_cond_broadcast(&(threadpool->pool_cond)) != 0) {
S63fbaiducuk5COpthread_mutex_unlock(&threadpool->pool_mutex);
S63fbaiducuk5CO
;
S63fbaiducuk5CO}
S63fbaiducuk5CO}
S63fbaiducuk5COwhile (threadpool->thr_alive > 0) {
/*原来是这位老兄负责等最后
名兄弟
信号啊*/
S63fbaiducuk5CO
(pthread_cond_wait (&threadpool->pool_cond, &threadpool->pool_mutex) != 0) {
S63fbaiducuk5COpthread_mutex_unlock(&threadpool->pool_mutex);
S63fbaiducuk5CO
;
S63fbaiducuk5CO}
S63fbaiducuk5CO}
S63fbaiducuk5CO
(pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
S63fbaiducuk5COlogg("!Mutex unlock failed\n");
S63fbaiducuk5COexit(-1);
S63fbaiducuk5CO}
S63fbaiducuk5CO
S63fbaiducuk5COpthread_mutex_destroy(&(threadpool->pool_mutex));
S63fbaiducuk5COpthread_cond_destroy(&(threadpool->pool_cond));
S63fbaiducuk5COpthread_attr_destroy(&(threadpool->pool_attr));
S63fbaiducuk5COfree(threadpool);
S63fbaiducuk5CO
;
S63fbaiducuk5CO}
threadpool_t *thrmgr_
(
max_threads,
idle_timeout, void (*handler)(void *))
S63fbaiducuk5CO{
S63fbaiducuk5COthreadpool_t *threadpool;
S63fbaiducuk5CO
S63fbaiducuk5CO
(max_threads <= 0) {
S63fbaiducuk5CO
NULL;
S63fbaiducuk5CO}
S63fbaiducuk5CO
S63fbaiducuk5COthreadpool = (threadpool_t *) mmalloc(
(threadpool_t));
threadpool->queue = work_queue_![](/icons/91168new.gif)
;
S63fbaiducuk5CO
(!threadpool->queue) {
S63fbaiducuk5COfree(threadpool);
S63fbaiducuk5CO
NULL;
S63fbaiducuk5CO}
S63fbaiducuk5COthreadpool->thr_max = max_threads;
S63fbaiducuk5COthreadpool->thr_alive = 0;
S63fbaiducuk5COthreadpool->thr_idle = 0;
S63fbaiducuk5COthreadpool->idle_timeout = idle_timeout;
S63fbaiducuk5COthreadpool->handler = handler;
S63fbaiducuk5CO
S63fbaiducuk5COpthread_mutex_init(&(threadpool->pool_mutex), NULL);
S63fbaiducuk5CO
(pthread_cond_init(&(threadpool->pool_cond), NULL) != 0) {
S63fbaiducuk5COfree(threadpool);
S63fbaiducuk5CO
NULL;
S63fbaiducuk5CO}
S63fbaiducuk5CO
S63fbaiducuk5CO
(pthread_attr_init(&(threadpool->pool_attr)) != 0) {
S63fbaiducuk5COfree(threadpool);
S63fbaiducuk5CO
NULL;
S63fbaiducuk5CO}
S63fbaiducuk5CO
S63fbaiducuk5CO
(pthread_attr_
detachstate(&(threadpool->pool_attr), PTHREAD_CREATE_DETACHED) != 0) {
S63fbaiducuk5COfree(threadpool);
S63fbaiducuk5CO
NULL;
S63fbaiducuk5CO}
S63fbaiducuk5COthreadpool->state = POOL_VALID;
threadpool;
S63fbaiducuk5CO}
/*工作线程.该工作线程遍历工作链表,如果有活干就干,没活干就等活干,难怪叫民工
*/
void *thrmgr_worker(void *arg)
S63fbaiducuk5CO{
S63fbaiducuk5COthreadpool_t *threadpool = (threadpool_t *) arg;
S63fbaiducuk5COvoid *job_data;
S63fbaiducuk5CO
retval, must_exit = FALSE;
S63fbaiducuk5COstruct timespec timeout;
S63fbaiducuk5CO
S63fbaiducuk5CO/* loop looking for work */
S63fbaiducuk5COfor (;;) {
S63fbaiducuk5CO
(pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
S63fbaiducuk5CO/* Fatal error */
S63fbaiducuk5COlogg("!Fatal: mutex lock failed\n");
S63fbaiducuk5COexit(-2);
S63fbaiducuk5CO}
S63fbaiducuk5COtimeout.tv_sec = time(NULL) + threadpool->idle_timeout;
S63fbaiducuk5COtimeout.tv_nsec = 0;
S63fbaiducuk5COthreadpool->thr_idle
;
S63fbaiducuk5COwhile (((job_data=work_queue_pop(threadpool->queue))
NULL)
S63fbaiducuk5CO&& (threadpool->state != POOL_EXIT)) {
S63fbaiducuk5CO/* Sleep, awaiting wakeup ,注意,民工等
段时间,如果没有活干就结束该线程*/
S63fbaiducuk5COretval = pthread_cond_timedwait(&(threadpool->pool_cond),
S63fbaiducuk5CO&(threadpool->pool_mutex), &timeout);
S63fbaiducuk5CO
(retval
ETIMEDOUT) {
S63fbaiducuk5COmust_exit = TRUE;
S63fbaiducuk5CO
;
S63fbaiducuk5CO}
S63fbaiducuk5CO}
S63fbaiducuk5COthreadpool->thr_idle--;//要干活了,闲着
民工少了
位
S63fbaiducuk5CO
(threadpool->state
POOL_EXIT) {
S63fbaiducuk5COmust_exit = TRUE;
S63fbaiducuk5CO}
S63fbaiducuk5CO
S63fbaiducuk5CO
(pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
S63fbaiducuk5CO/* Fatal error */
S63fbaiducuk5COlogg("!Fatal: mutex unlock failed\n");
S63fbaiducuk5COexit(-2);
S63fbaiducuk5CO}
S63fbaiducuk5CO
(job_data) {
S63fbaiducuk5COthreadpool->handler(job_data);
S63fbaiducuk5CO}
(must_exit) {
/*如果没有等到活或者要结束整个线程池时,该线程收工*/
S63fbaiducuk5CO
;
S63fbaiducuk5CO}
S63fbaiducuk5CO}
S63fbaiducuk5CO
(pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
S63fbaiducuk5CO/* Fatal error */
S63fbaiducuk5COlogg("!Fatal: mutex lock failed\n");
S63fbaiducuk5COexit(-2);
S63fbaiducuk5CO}
S63fbaiducuk5COthreadpool->thr_alive--;//活干完了,该走人了(人又少了
个)
S63fbaiducuk5CO
(threadpool->thr_alive
0) {
S63fbaiducuk5CO/* signal that all threads are finished */
S63fbaiducuk5COpthread_cond_broadcast(&threadpool->pool_cond);//人都跑光了,谁还听得到这个信号?多次
举吗?
S63fbaiducuk5CO}
S63fbaiducuk5CO
(pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
S63fbaiducuk5CO/* Fatal error */
S63fbaiducuk5COlogg("!Fatal: mutex unlock failed\n");
S63fbaiducuk5COexit(-2);
S63fbaiducuk5CO}
S63fbaiducuk5CO
NULL;
S63fbaiducuk5CO}
/*创建
个工作线程,如果目前有等待条件信号
工作线程,则唤醒该工作线程处理数据
*/
thrmgr_dispatch(threadpool_t *threadpool, void *user_data)
S63fbaiducuk5CO{
S63fbaiducuk5COpthread_t thr_id;
(!threadpool) {
S63fbaiducuk5CO
FALSE;
S63fbaiducuk5CO}
/* Lock the threadpool */
S63fbaiducuk5CO
(pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
S63fbaiducuk5COlogg("!Mutex lock failed\n");
S63fbaiducuk5CO
FALSE;
S63fbaiducuk5CO}
(threadpool->state != POOL_VALID) {
S63fbaiducuk5CO
(pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
S63fbaiducuk5COlogg("!Mutex unlock failed\n");
S63fbaiducuk5CO
FALSE;
S63fbaiducuk5CO}
S63fbaiducuk5CO
FALSE;
S63fbaiducuk5CO}
S63fbaiducuk5COwork_queue_add(threadpool->queue, user_data);
/*只有当目前没有线程idle且目前生成
线程数小于最大线程要求时
*创建新
线程
*/
((threadpool->thr_idle
0) &&
S63fbaiducuk5CO(threadpool->thr_alive < threadpool->thr_max)) {
S63fbaiducuk5CO/* Start a
thread */
S63fbaiducuk5CO
(pthread_create(&thr_id, &(threadpool->pool_attr),
S63fbaiducuk5COthrmgr_worker, threadpool) != 0) {
S63fbaiducuk5COlogg("!pthread_create failed\n");
S63fbaiducuk5CO}
{
S63fbaiducuk5COthreadpool->thr_alive
;
S63fbaiducuk5CO}
S63fbaiducuk5CO}
/*释放条件信号,如果有正在等待该信号
线程,则该线程运行*/
S63fbaiducuk5COpthread_cond_signal(&(threadpool->pool_cond));
(pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
S63fbaiducuk5COlogg("!Mutex unlock failed\n");
S63fbaiducuk5CO
FALSE;
S63fbaiducuk5CO}
S63fbaiducuk5CO
TRUE;
S63fbaiducuk5CO}
/*
S63fbaiducuk5CO使用方法,以
个tcp服务器为例,简单列出,可能有问题,请牛人指正.
1, thrmgr_![](/icons/91168new.gif)
话
2, while(1)
{
accept(......);
//构建输入参数
thrmgr_dispach(...);
}
thrmgr_destory(...);
*/
S63fbaiducuk5CO
=posted>Posted by rui at August 21, 2005 0