概述
freeswitch的核心源代码是基于apr库开发的,在不同的系统上有很好的移植性,
APR库在之前的文章中已经介绍过了,APR-UTIL库是和APR并列的工具库,它们都是由APACHE开源出来的跨平台可移植库,不同点在于库中实作的功能界面有区别,
在应用的开发程序中,多执行绪并发是提高效率的常用方案,但是多执行绪管理并不好做,
在很多大型应用中,都会引入执行绪池的框架,执行绪池是一个执行绪集合,有统一的管理,当有一个新的任务下发,执行绪池管理会按照一定的策略将任务分配给空闲的执行绪,当任务积压较多时,执行绪池会创建新的执行绪来加快处理效率,
APR-UTIL库中就提供了一套执行绪池界面,
我对几个问题比较好奇,执行绪池如何管理?执行绪池什么情况要增加执行绪?什么情况会减少执行绪?执行绪池执行绪数目如何设定才有最优的效率?
下面我们对apr-util库的执行绪池实作做一个介绍,
环境
centos:CentOS release 7.0 (Final)或以上版本
APR-UTIL:1.6.1
GCC:4.8.5
本来要使用freeswitch1.8.7中带的apr-util库源代码来梳理,但是很遗憾的是这个apr-util库版本是1.2.8,里面没有apr_thread_pool界面,,,所以从APR官网上下载了最新的1.6.1版本来做分析,
资料结构
apr执行绪池源档案:
apr-util-1.6.1\include\apr_thread_pool.h
apr-util-1.6.1\misc\apr_thread_pool.c
号码池资料结构定义在apr_thread_pool.c中
typedef struct apr_thread_pool_task
{
APR_RING_ENTRY(apr_thread_pool_task) link;
apr_thread_start_t func;
void *param;
void *owner;
union
{
apr_byte_t priority;
apr_time_t time;
} dispatch;
} apr_thread_pool_task_t;
APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task);
struct apr_thread_list_elt
{
APR_RING_ENTRY(apr_thread_list_elt) link;
apr_thread_t *thd;
volatile void *current_owner;
volatile enum { TH_RUN, TH_STOP, TH_PROBATION } state;
};
APR_RING_HEAD(apr_thread_list, apr_thread_list_elt);
struct apr_thread_pool
{
apr_pool_t *pool;
volatile apr_size_t thd_max;
volatile apr_size_t idle_max;
volatile apr_interval_time_t idle_wait;
volatile apr_size_t thd_cnt;
volatile apr_size_t idle_cnt;
volatile apr_size_t task_cnt;
volatile apr_size_t scheduled_task_cnt;
volatile apr_size_t threshold;
volatile apr_size_t tasks_run;
volatile apr_size_t tasks_high;
volatile apr_size_t thd_high;
volatile apr_size_t thd_timed_out;
struct apr_thread_pool_tasks *tasks;
struct apr_thread_pool_tasks *scheduled_tasks;
struct apr_thread_list *busy_thds;
struct apr_thread_list *idle_thds;
apr_thread_mutex_t *lock;
apr_thread_cond_t *cond;
volatile int terminated;
struct apr_thread_pool_tasks *recycled_tasks;
struct apr_thread_list *recycled_thds;
apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS];
};
执行绪池存储器模型总图,执行绪池,任务队列,执行绪队列,
常用函式
常用函式界面
apr_thread_pool_create //Create a thread pool
apr_thread_pool_destroy //Destroy the thread pool and stop all the threads
apr_thread_pool_push //Schedule a task to the bottom of the tasks of same priority.
apr_thread_pool_schedule //Schedule a task to be run after a delay
apr_thread_pool_top //Schedule a task to the top of the tasks of same priority.
apr_thread_pool_tasks_cancel //Cancel tasks submitted by the owner. If there is any task from the owner that is currently running, the function will spin until the task finished.
apr_thread_pool_tasks_count //Get the current number of tasks waiting in the queue
apr_thread_pool_scheduled_tasks_count //Get the current number of scheduled tasks waiting in the queue
apr_thread_pool_threads_count //Get the current number of threads
apr_thread_pool_busy_count //Get the current number of busy threads
apr_thread_pool_idle_count //Get the current number of idle threads
apr_thread_pool_idle_max_set //Access function for the maximum number of idle threads. Number of current idle threads will be reduced to the new limit.
apr_thread_pool_tasks_run_count //Get number of tasks that have run
apr_thread_pool_tasks_high_count //Get high water mark of the number of tasks waiting to run
apr_thread_pool_threads_high_count //Get high water mark of the number of threads
apr_thread_pool_threads_idle_timeout_count //Get the number of idle threads that were destroyed after timing out
apr_thread_pool_idle_max_get //Access function for the maximum number of idle threads
apr_thread_pool_thread_max_set //Access function for the maximum number of threads.
apr_thread_pool_idle_wait_set //Access function for the maximum wait time (in microseconds) of an idling thread that exceeds the maximum number of idling threads. A non-zero value allows for the reaping of idling threads to shrink over time. Which helps reduce thrashing.
apr_thread_pool_idle_wait_get //Access function for the maximum wait time (in microseconds) of an idling thread that exceeds the maximum number of idling threads
apr_thread_pool_thread_max_get //Access function for the maximum number of threads
apr_thread_pool_threshold_set //Access function for the threshold of tasks in queue to trigger a new thread.
apr_thread_pool_threshold_get //Access function for the threshold of tasks in queue to trigger a new thread.
apr_thread_pool_task_owner_get //Get owner of the task currently been executed by the thread.
apr_thread_pool_create创建
APU_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me,
apr_size_t init_threads,
apr_size_t max_threads,
apr_pool_t * pool)
界面逻辑:
- 分配一块大小为apr_thread_pool_t的存储器tp,
- 在传入的存储器池pool中申请一个新的存储器池tp->pool,
- 初始化执行绪池资料,
a) 执行绪池资料初始化,
b) 创建执行绪互斥锁me->lock,
c) 创建条件变量me->cond,
d) 在存储器池pool上分配一块大小为“apr_thread_pool_tasks“的存储器赋值给me->tasks,
e) 在存储器池pool上分配一块大小为“apr_thread_pool_tasks“的存储器赋值给me->scheduled_tasks,
f) 在存储器池pool上分配一块大小为“apr_thread_pool_tasks“的存储器赋值给me->recycled_tasks,
g) 在存储器池pool上分配一块大小为“apr_thread_list“的存储器赋值给me->busy_thds,
h) 在存储器池pool上分配一块大小为“apr_thread_list“的存储器赋值给me->idle_thds,
i) 在存储器池pool上分配一块大小为“apr_thread_list“的存储器赋值给me->recycled_thds,
j) 执行绪池资料初始化,
- 在存储器池tp->pool中注册清理回呼函式,
- 回圈创建初始作业执行绪,并加入执行绪池的管理,作业执行绪的逻辑见“thread_pool_func作业执行绪”,
- 回传创建结果,
执行绪池初始化成功后,存储器模型如图(作业执行绪启动未完成时)
thread_pool_func作业执行绪
static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param)
界面逻辑:
- 加锁me->lock
- 判断me->recycled_thds链表为空?为空则创建新的apr_thread_list_elt节点elt,不为空则获取recycled_thds中首节点elt并从recycled_thds中移除该节点,
- 回圈处理,
a) 将elt节点加入me->busy_thds链表,
b) 获取一个新任务task,TODO
c) 回圈处理,解锁me->lock,呼叫任务回呼task->func,加锁me->lock,将task加入me->recycled_tasks链表,获取新任务task,执行绪状态置为TH_STOP时跳出回圈,获取任务为空跳出回圈,
d) 执行绪从busy到stop状态,将elt加入me->recycled_thds链表尾部,解锁me->lock,退出执行绪,
e) 执行绪从busy到idle状态,将elt节点从me->busy_thds链表中移除,将elt加入me->idle_thds链表尾部,
f) 检查是否有定时任务并获取任务执行等待时间,
g) 检查当前空闲执行绪数是否大于最大空闲数,获取空闲等待时间me->idle_wait,并设定当前执行绪状态为TH_PROBATION,下一轮回圈中进入stop处理流程,
h) 执行绪阻塞,等待条件变量me->cond的通知或超时,
- 执行绪数me->thd_cnt自减,
- 解锁me->lock,
- 退出执行绪,
执行绪池初始化成功后,存储器模型如图(作业执行绪启动完成时)
apr_thread_pool_push添加任务
APU_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t *me,
apr_thread_start_t func,
void *param,
apr_byte_t priority,
void *owner)
界面逻辑:
- 加锁me->lock,
- 检查me->recycled_tasks是否为空,为空则新建任务节点t,不为空则从me->recycled_tasks获取任务节点t,
- 任务节点t资料初始化,
- 计算任务优先级,根据优先级设定me->task_idx[seg]和me->tasks,
- 当前作业执行绪数为0时,或者空闲执行绪数为0并且当前执行绪数未达到最大并且当前任务数超过阈值等条件,动态创建新的作业执行绪,
- 对条件变量me->cond发通知,
- 解锁me->lock,
执行绪池添加任务后的存储器模型图,
apr_thread_pool_tasks_cancel取消任务
APU_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t *me,
void *owner)
界面逻辑:
- 加锁me->lock,
- 如果当前任务数大于0,则清空owner的所有任务,
- 如果定时任务数大于0,则清空owner的所有定时任务,
- 解锁me->lock,
- 等待执行绪退出,
总结
APR执行绪池的几个关注点,
执行绪从busy到stop状态时,没有将elt节点从me->busy_thds链表中洗掉?
APR执行绪池没有内置的管理执行绪,根据当前执行绪数和任务数进行动态的调整,而是通过任务阈值、空闲执行绪最大值和超时时间等设定来控制执行绪数的增减,这一点和我开始想的不一样,
空空如常
求真得真
0 评论