asterisk/main/threadpool.c

1437 lines
39 KiB
C

/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2012-2013, Digium, Inc.
*
* Mark Michelson <mmmichelson@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#include "asterisk.h"
#include "asterisk/threadpool.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/astobj2.h"
#include "asterisk/utils.h"
/* Needs to stay prime if increased */
#define THREAD_BUCKETS 89
/*!
* \brief An opaque threadpool structure
*
* A threadpool is a collection of threads that execute
* tasks from a common queue.
*/
struct ast_threadpool {
/*! Threadpool listener */
struct ast_threadpool_listener *listener;
/*!
* \brief The container of active threads.
* Active threads are those that are currently running tasks
*/
struct ao2_container *active_threads;
/*!
* \brief The container of idle threads.
* Idle threads are those that are currently waiting to run tasks
*/
struct ao2_container *idle_threads;
/*!
* \brief The container of zombie threads.
* Zombie threads may be running tasks, but they are scheduled to die soon
*/
struct ao2_container *zombie_threads;
/*!
* \brief The main taskprocessor
*
* Tasks that are queued in this taskprocessor are
* doled out to the worker threads. Worker threads that
* execute tasks from the threadpool are executing tasks
* in this taskprocessor.
*
* The threadpool itself is actually the private data for
* this taskprocessor's listener. This way, as taskprocessor
* changes occur, the threadpool can alert its listeners
* appropriately.
*/
struct ast_taskprocessor *tps;
/*!
* \brief The control taskprocessor
*
* This is a standard taskprocessor that uses the default
* taskprocessor listener. In other words, all tasks queued to
* this taskprocessor have a single thread that executes the
* tasks.
*
* All tasks that modify the state of the threadpool and all tasks
* that call out to threadpool listeners are pushed to this
* taskprocessor.
*
* For instance, when the threadpool changes sizes, a task is put
* into this taskprocessor to do so. When it comes time to tell the
* threadpool listener that worker threads have changed state,
* the task is placed in this taskprocessor.
*
* This is done for three main reasons
* 1) It ensures that listeners are given an accurate portrayal
* of the threadpool's current state. In other words, when a listener
* gets told a count of active, idle and zombie threads, it does not
* need to worry that internal state of the threadpool might be different
* from what it has been told.
* 2) It minimizes the locking required in both the threadpool and in
* threadpool listener's callbacks.
* 3) It ensures that listener callbacks are called in the same order
* that the threadpool had its state change.
*/
struct ast_taskprocessor *control_tps;
/*! True if the threadpool is in the process of shutting down */
int shutting_down;
/*! Threadpool-specific options */
struct ast_threadpool_options options;
};
/*!
* \brief listener for a threadpool
*
* The listener is notified of changes in a threadpool. It can
* react by doing things like increasing the number of threads
* in the pool
*/
struct ast_threadpool_listener {
/*! Callbacks called by the threadpool */
const struct ast_threadpool_listener_callbacks *callbacks;
/*! User data for the listener */
void *user_data;
};
/*!
* \brief states for worker threads
*/
enum worker_state {
/*! The worker is either active or idle */
ALIVE,
/*!
* The worker has been asked to shut down but
* may still be in the process of executing tasks.
* This transition happens when the threadpool needs
* to shrink and needs to kill active threads in order
* to do so.
*/
ZOMBIE,
/*!
* The worker has been asked to shut down. Typically
* only idle threads go to this state directly, but
* active threads may go straight to this state when
* the threadpool is shut down.
*/
DEAD,
};
/*!
* A thread that executes threadpool tasks
*/
struct worker_thread {
/*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
int id;
/*! Condition used in conjunction with state changes */
ast_cond_t cond;
/*! Lock used alongside the condition for state changes */
ast_mutex_t lock;
/*! The actual thread that is executing tasks */
pthread_t thread;
/*! A pointer to the threadpool. Needed to be able to execute tasks */
struct ast_threadpool *pool;
/*! The current state of the worker thread */
enum worker_state state;
/*! A boolean used to determine if an idle thread should become active */
int wake_up;
/*! Options for this threadpool */
struct ast_threadpool_options options;
};
/* Worker thread forward declarations. See definitions for documentation */
static int worker_thread_hash(const void *obj, int flags);
static int worker_thread_cmp(void *obj, void *arg, int flags);
static void worker_thread_destroy(void *obj);
static void worker_active(struct worker_thread *worker);
static void *worker_start(void *arg);
static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
static int worker_thread_start(struct worker_thread *worker);
static int worker_idle(struct worker_thread *worker);
static int worker_set_state(struct worker_thread *worker, enum worker_state state);
static void worker_shutdown(struct worker_thread *worker);
/*!
* \brief Notify the threadpool listener that the state has changed.
*
* This notifies the threadpool listener via its state_changed callback.
* \param pool The threadpool whose state has changed
*/
static void threadpool_send_state_changed(struct ast_threadpool *pool)
{
int active_size = ao2_container_count(pool->active_threads);
int idle_size = ao2_container_count(pool->idle_threads);
if (pool->listener && pool->listener->callbacks->state_changed) {
pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
}
}
/*!
* \brief Struct used for queued operations involving worker state changes
*/
struct thread_worker_pair {
/*! Threadpool that contains the worker whose state has changed */
struct ast_threadpool *pool;
/*! Worker whose state has changed */
struct worker_thread *worker;
};
/*!
* \brief Destructor for thread_worker_pair
*/
static void thread_worker_pair_free(struct thread_worker_pair *pair)
{
ao2_ref(pair->worker, -1);
ast_free(pair);
}
/*!
* \brief Allocate and initialize a thread_worker_pair
* \param pool Threadpool to assign to the thread_worker_pair
* \param worker Worker thread to assign to the thread_worker_pair
*/
static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
struct worker_thread *worker)
{
struct thread_worker_pair *pair = ast_malloc(sizeof(*pair));
if (!pair) {
return NULL;
}
pair->pool = pool;
ao2_ref(worker, +1);
pair->worker = worker;
return pair;
}
/*!
* \brief Move a worker thread from the active container to the idle container.
*
* This function is called from the threadpool's control taskprocessor thread.
* \param data A thread_worker_pair containing the threadpool and the worker to move.
* \return 0
*/
static int queued_active_thread_idle(void *data)
{
struct thread_worker_pair *pair = data;
ao2_link(pair->pool->idle_threads, pair->worker);
ao2_unlink(pair->pool->active_threads, pair->worker);
threadpool_send_state_changed(pair->pool);
thread_worker_pair_free(pair);
return 0;
}
/*!
* \brief Queue a task to move a thread from the active list to the idle list
*
* This is called by a worker thread when it runs out of tasks to perform and
* goes idle.
* \param pool The threadpool to which the worker belongs
* \param worker The worker thread that has gone idle
*/
static void threadpool_active_thread_idle(struct ast_threadpool *pool,
struct worker_thread *worker)
{
struct thread_worker_pair *pair;
SCOPED_AO2LOCK(lock, pool);
if (pool->shutting_down) {
return;
}
pair = thread_worker_pair_alloc(pool, worker);
if (!pair) {
return;
}
if (ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair)) {
thread_worker_pair_free(pair);
}
}
/*!
* \brief Kill a zombie thread
*
* This runs from the threadpool's control taskprocessor thread.
*
* \param data A thread_worker_pair containing the threadpool and the zombie thread
* \return 0
*/
static int queued_zombie_thread_dead(void *data)
{
struct thread_worker_pair *pair = data;
ao2_unlink(pair->pool->zombie_threads, pair->worker);
threadpool_send_state_changed(pair->pool);
thread_worker_pair_free(pair);
return 0;
}
/*!
* \brief Queue a task to kill a zombie thread
*
* This is called by a worker thread when it acknowledges that it is time for
* it to die.
*/
static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
struct worker_thread *worker)
{
struct thread_worker_pair *pair;
SCOPED_AO2LOCK(lock, pool);
if (pool->shutting_down) {
return;
}
pair = thread_worker_pair_alloc(pool, worker);
if (!pair) {
return;
}
if (ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair)) {
thread_worker_pair_free(pair);
}
}
static int queued_idle_thread_dead(void *data)
{
struct thread_worker_pair *pair = data;
ao2_unlink(pair->pool->idle_threads, pair->worker);
threadpool_send_state_changed(pair->pool);
thread_worker_pair_free(pair);
return 0;
}
static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
struct worker_thread *worker)
{
struct thread_worker_pair *pair;
SCOPED_AO2LOCK(lock, pool);
if (pool->shutting_down) {
return;
}
pair = thread_worker_pair_alloc(pool, worker);
if (!pair) {
return;
}
if (ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair)) {
thread_worker_pair_free(pair);
}
}
/*!
* \brief Execute a task in the threadpool
*
* This is the function that worker threads call in order to execute tasks
* in the threadpool
*
* \param pool The pool to which the tasks belong.
* \retval 0 Either the pool has been shut down or there are no tasks.
* \retval 1 There are still tasks remaining in the pool.
*/
static int threadpool_execute(struct ast_threadpool *pool)
{
ao2_lock(pool);
if (!pool->shutting_down) {
ao2_unlock(pool);
return ast_taskprocessor_execute(pool->tps);
}
ao2_unlock(pool);
return 0;
}
/*!
* \brief Destroy a threadpool's components.
*
* This is the destructor called automatically when the threadpool's
* reference count reaches zero. This is not to be confused with
* threadpool_destroy.
*
* By the time this actually gets called, most of the cleanup has already
* been done in the pool. The only thing left to do is to release the
* final reference to the threadpool listener.
*
* \param obj The pool to destroy
*/
static void threadpool_destructor(void *obj)
{
struct ast_threadpool *pool = obj;
ao2_cleanup(pool->listener);
}
/*!
* \brief Allocate a threadpool
*
* This is implemented as a taskprocessor listener's alloc callback. This
* is because the threadpool exists as the private data on a taskprocessor
* listener.
*
* \param name The name of the threadpool.
* \param options The options the threadpool uses.
* \retval NULL Could not initialize threadpool properly
* \retval non-NULL The newly-allocated threadpool
*/
static struct ast_threadpool *threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
{
RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
struct ast_str *control_tps_name;
pool = ao2_alloc(sizeof(*pool), threadpool_destructor);
control_tps_name = ast_str_create(64);
if (!pool || !control_tps_name) {
ast_free(control_tps_name);
return NULL;
}
ast_str_set(&control_tps_name, 0, "%s/pool-control", name);
pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
ast_free(control_tps_name);
if (!pool->control_tps) {
return NULL;
}
pool->active_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
THREAD_BUCKETS, worker_thread_hash, NULL, worker_thread_cmp);
if (!pool->active_threads) {
return NULL;
}
pool->idle_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
THREAD_BUCKETS, worker_thread_hash, NULL, worker_thread_cmp);
if (!pool->idle_threads) {
return NULL;
}
pool->zombie_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
THREAD_BUCKETS, worker_thread_hash, NULL, worker_thread_cmp);
if (!pool->zombie_threads) {
return NULL;
}
pool->options = *options;
ao2_ref(pool, +1);
return pool;
}
static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
{
return 0;
}
/*!
* \brief helper used for queued task when tasks are pushed
*/
struct task_pushed_data {
/*! Pool into which a task was pushed */
struct ast_threadpool *pool;
/*! Indicator of whether the pool had no tasks prior to the new task being added */
int was_empty;
};
/*!
* \brief Allocate and initialize a task_pushed_data
* \param pool The threadpool to set in the task_pushed_data
* \param was_empty The was_empty value to set in the task_pushed_data
* \retval NULL Unable to allocate task_pushed_data
* \retval non-NULL The newly-allocated task_pushed_data
*/
static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
int was_empty)
{
struct task_pushed_data *tpd = ast_malloc(sizeof(*tpd));
if (!tpd) {
return NULL;
}
tpd->pool = pool;
tpd->was_empty = was_empty;
return tpd;
}
/*!
* \brief Activate idle threads
*
* This function always returns CMP_MATCH because all workers that this
* function acts on need to be seen as matches so they are unlinked from the
* list of idle threads.
*
* Called as an ao2_callback in the threadpool's control taskprocessor thread.
* \param obj The worker to activate
* \param arg The pool where the worker belongs
* \param flags
* \retval CMP_MATCH
*/
static int activate_thread(void *obj, void *arg, int flags)
{
struct worker_thread *worker = obj;
struct ast_threadpool *pool = arg;
if (!ao2_link(pool->active_threads, worker)) {
/* If we can't link the idle thread into the active container, then
* we'll just leave the thread idle and not wake it up.
*/
ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
worker->id);
return 0;
}
if (worker_set_state(worker, ALIVE)) {
ast_debug(1, "Failed to activate thread %d. It is dead\n",
worker->id);
/* The worker thread will no longer exist in the active threads or
* idle threads container after this.
*/
ao2_unlink(pool->active_threads, worker);
}
return CMP_MATCH;
}
/*!
* \brief Add threads to the threadpool
*
* This function is called from the threadpool's control taskprocessor thread.
* \param pool The pool that is expanding
* \param delta The number of threads to add to the pool
*/
static void grow(struct ast_threadpool *pool, int delta)
{
int i;
int current_size = ao2_container_count(pool->active_threads) +
ao2_container_count(pool->idle_threads);
if (pool->options.max_size && current_size + delta > pool->options.max_size) {
delta = pool->options.max_size - current_size;
}
ast_debug(3, "Increasing threadpool %s's size by %d\n",
ast_taskprocessor_name(pool->tps), delta);
for (i = 0; i < delta; ++i) {
struct worker_thread *worker = worker_thread_alloc(pool);
if (!worker) {
return;
}
if (ao2_link(pool->idle_threads, worker)) {
if (worker_thread_start(worker)) {
ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
ao2_unlink(pool->active_threads, worker);
}
} else {
ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
}
ao2_ref(worker, -1);
}
}
/*!
* \brief Queued task called when tasks are pushed into the threadpool
*
* This function first calls into the threadpool's listener to let it know
* that a task has been pushed. It then wakes up all idle threads and moves
* them into the active thread container.
* \param data A task_pushed_data
* \return 0
*/
static int queued_task_pushed(void *data)
{
struct task_pushed_data *tpd = data;
struct ast_threadpool *pool = tpd->pool;
int was_empty = tpd->was_empty;
unsigned int existing_active;
ast_free(tpd);
if (pool->listener && pool->listener->callbacks->task_pushed) {
pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
}
existing_active = ao2_container_count(pool->active_threads);
/* The first pass transitions any existing idle threads to be active, and
* will also remove any worker threads that have recently entered the dead
* state.
*/
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
activate_thread, pool);
/* If no idle threads could be transitioned to active grow the pool as permitted. */
if (ao2_container_count(pool->active_threads) == existing_active) {
if (!pool->options.auto_increment) {
return 0;
}
grow(pool, pool->options.auto_increment);
/* An optional second pass transitions any newly added threads. */
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
activate_thread, pool);
}
threadpool_send_state_changed(pool);
return 0;
}
/*!
* \brief Taskprocessor listener callback called when a task is added
*
* The threadpool uses this opportunity to queue a task on its control taskprocessor
* in order to activate idle threads and notify the threadpool listener that the
* task has been pushed.
* \param listener The taskprocessor listener. The threadpool is the listener's private data
* \param was_empty True if the taskprocessor was empty prior to the task being pushed
*/
static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
int was_empty)
{
struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
struct task_pushed_data *tpd;
SCOPED_AO2LOCK(lock, pool);
if (pool->shutting_down) {
return;
}
tpd = task_pushed_data_alloc(pool, was_empty);
if (!tpd) {
return;
}
if (ast_taskprocessor_push(pool->control_tps, queued_task_pushed, tpd)) {
ast_free(tpd);
}
}
/*!
* \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
*
* This simply lets the threadpool's listener know that the threadpool is devoid of tasks
* \param data The pool that has become empty
* \return 0
*/
static int queued_emptied(void *data)
{
struct ast_threadpool *pool = data;
/* We already checked for existence of this callback when this was queued */
pool->listener->callbacks->emptied(pool, pool->listener);
return 0;
}
/*!
* \brief Taskprocessor listener emptied callback
*
* The threadpool queues a task to let the threadpool listener know that
* the threadpool no longer contains any tasks.
* \param listener The taskprocessor listener. The threadpool is the listener's private data.
*/
static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
{
struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
SCOPED_AO2LOCK(lock, pool);
if (pool->shutting_down) {
return;
}
if (pool->listener && pool->listener->callbacks->emptied) {
if (ast_taskprocessor_push(pool->control_tps, queued_emptied, pool)) {
/* Nothing to do here but we need the check to keep the compiler happy. */
}
}
}
/*!
* \brief Taskprocessor listener shutdown callback
*
* The threadpool will shut down and destroy all of its worker threads when
* this is called back. By the time this gets called, the taskprocessor's
* control taskprocessor has already been destroyed. Therefore there is no risk
* in outright destroying the worker threads here.
* \param listener The taskprocessor listener. The threadpool is the listener's private data.
*/
static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
{
struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
if (pool->listener && pool->listener->callbacks->shutdown) {
pool->listener->callbacks->shutdown(pool->listener);
}
ao2_cleanup(pool->active_threads);
ao2_cleanup(pool->idle_threads);
ao2_cleanup(pool->zombie_threads);
ao2_cleanup(pool);
}
/*!
* \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
*/
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
.start = threadpool_tps_start,
.task_pushed = threadpool_tps_task_pushed,
.emptied = threadpool_tps_emptied,
.shutdown = threadpool_tps_shutdown,
};
/*!
* \brief ao2 callback to kill a set number of threads.
*
* Threads will be unlinked from the container as long as the
* counter has not reached zero. The counter is decremented with
* each thread that is removed.
* \param obj The worker thread up for possible destruction
* \param arg The counter
* \param flags Unused
* \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
* \retval CMP_STOP The counter has reached zero so no more threads should be removed.
*/
static int kill_threads(void *obj, void *arg, int flags)
{
int *num_to_kill = arg;
if (*num_to_kill > 0) {
--(*num_to_kill);
return CMP_MATCH;
} else {
return CMP_STOP;
}
}
/*!
* \brief ao2 callback to zombify a set number of threads.
*
* Threads will be zombified as long as the counter has not reached
* zero. The counter is decremented with each thread that is zombified.
*
* Zombifying a thread involves removing it from its current container,
* adding it to the zombie container, and changing the state of the
* worker to a zombie
*
* This callback is called from the threadpool control taskprocessor thread.
*
* \param obj The worker thread that may be zombified
* \param arg The pool to which the worker belongs
* \param data The counter
* \param flags Unused
* \retval CMP_MATCH The zombified thread should be removed from its current container
* \retval CMP_STOP Stop attempting to zombify threads
*/
static int zombify_threads(void *obj, void *arg, void *data, int flags)
{
struct worker_thread *worker = obj;
struct ast_threadpool *pool = arg;
int *num_to_zombify = data;
if ((*num_to_zombify)-- > 0) {
if (!ao2_link(pool->zombie_threads, worker)) {
ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
return 0;
}
worker_set_state(worker, ZOMBIE);
return CMP_MATCH;
} else {
return CMP_STOP;
}
}
/*!
* \brief Remove threads from the threadpool
*
* The preference is to kill idle threads. However, if there are
* more threads to remove than there are idle threads, then active
* threads will be zombified instead.
*
* This function is called from the threadpool control taskprocessor thread.
*
* \param pool The threadpool to remove threads from
* \param delta The number of threads to remove
*/
static void shrink(struct ast_threadpool *pool, int delta)
{
/*
* Preference is to kill idle threads, but
* we'll move on to deactivating active threads
* if we have to
*/
int idle_threads = ao2_container_count(pool->idle_threads);
int idle_threads_to_kill = MIN(delta, idle_threads);
int active_threads_to_zombify = delta - idle_threads_to_kill;
ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
ast_taskprocessor_name(pool->tps));
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
kill_threads, &idle_threads_to_kill);
ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
ast_taskprocessor_name(pool->tps));
ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
zombify_threads, pool, &active_threads_to_zombify);
}
/*!
* \brief Helper struct used for queued operations that change the size of the threadpool
*/
struct set_size_data {
/*! The pool whose size is to change */
struct ast_threadpool *pool;
/*! The requested new size of the pool */
unsigned int size;
};
/*!
* \brief Allocate and initialize a set_size_data
* \param pool The pool for the set_size_data
* \param size The size to store in the set_size_data
*/
static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
unsigned int size)
{
struct set_size_data *ssd = ast_malloc(sizeof(*ssd));
if (!ssd) {
return NULL;
}
ssd->pool = pool;
ssd->size = size;
return ssd;
}
/*!
* \brief Change the size of the threadpool
*
* This can either result in shrinking or growing the threadpool depending
* on the new desired size and the current size.
*
* This function is run from the threadpool control taskprocessor thread
*
* \param data A set_size_data used for determining how to act
* \return 0
*/
static int queued_set_size(void *data)
{
struct set_size_data *ssd = data;
struct ast_threadpool *pool = ssd->pool;
unsigned int num_threads = ssd->size;
/* We don't count zombie threads as being "live" when potentially resizing */
unsigned int current_size = ao2_container_count(pool->active_threads) +
ao2_container_count(pool->idle_threads);
ast_free(ssd);
if (current_size == num_threads) {
ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
num_threads, current_size);
return 0;
}
if (current_size < num_threads) {
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
activate_thread, pool);
/* As the above may have altered the number of current threads update it */
current_size = ao2_container_count(pool->active_threads) +
ao2_container_count(pool->idle_threads);
grow(pool, num_threads - current_size);
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
activate_thread, pool);
} else {
shrink(pool, current_size - num_threads);
}
threadpool_send_state_changed(pool);
return 0;
}
void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
{
struct set_size_data *ssd;
SCOPED_AO2LOCK(lock, pool);
if (pool->shutting_down) {
return;
}
ssd = set_size_data_alloc(pool, size);
if (!ssd) {
return;
}
if (ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd)) {
ast_free(ssd);
}
}
struct ast_threadpool_listener *ast_threadpool_listener_alloc(
const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
{
struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), NULL);
if (!listener) {
return NULL;
}
listener->callbacks = callbacks;
listener->user_data = user_data;
return listener;
}
void *ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
{
return listener->user_data;
}
struct pool_options_pair {
struct ast_threadpool *pool;
struct ast_threadpool_options options;
};
struct ast_threadpool *ast_threadpool_create(const char *name,
struct ast_threadpool_listener *listener,
const struct ast_threadpool_options *options)
{
struct ast_taskprocessor *tps;
RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
char *fullname;
pool = threadpool_alloc(name, options);
if (!pool) {
return NULL;
}
tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
if (!tps_listener) {
return NULL;
}
if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
return NULL;
}
fullname = ast_alloca(strlen(name) + strlen("/pool") + 1);
sprintf(fullname, "%s/pool", name); /* Safe */
tps = ast_taskprocessor_create_with_listener(fullname, tps_listener);
if (!tps) {
return NULL;
}
pool->tps = tps;
if (listener) {
ao2_ref(listener, +1);
pool->listener = listener;
}
ast_threadpool_set_size(pool, pool->options.initial_size);
ao2_ref(pool, +1);
return pool;
}
int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
{
SCOPED_AO2LOCK(lock, pool);
if (!pool->shutting_down) {
return ast_taskprocessor_push(pool->tps, task, data);
}
return -1;
}
void ast_threadpool_shutdown(struct ast_threadpool *pool)
{
if (!pool) {
return;
}
/* Shut down the taskprocessors and everything else just
* takes care of itself via the taskprocessor callbacks
*/
ao2_lock(pool);
pool->shutting_down = 1;
ao2_unlock(pool);
ast_taskprocessor_unreference(pool->control_tps);
ast_taskprocessor_unreference(pool->tps);
}
/*!
* A monotonically increasing integer used for worker
* thread identification.
*/
static int worker_id_counter;
static int worker_thread_hash(const void *obj, int flags)
{
const struct worker_thread *worker = obj;
return worker->id;
}
static int worker_thread_cmp(void *obj, void *arg, int flags)
{
struct worker_thread *worker1 = obj;
struct worker_thread *worker2 = arg;
return worker1->id == worker2->id ? CMP_MATCH : 0;
}
/*!
* \brief shut a worker thread down
*
* Set the worker dead and then wait for its thread
* to finish executing.
*
* \param worker The worker thread to shut down
*/
static void worker_shutdown(struct worker_thread *worker)
{
worker_set_state(worker, DEAD);
if (worker->thread != AST_PTHREADT_NULL) {
pthread_join(worker->thread, NULL);
worker->thread = AST_PTHREADT_NULL;
}
}
/*!
* \brief Worker thread destructor
*
* Called automatically when refcount reaches 0. Shuts
* down the worker thread and destroys its component
* parts
*/
static void worker_thread_destroy(void *obj)
{
struct worker_thread *worker = obj;
ast_debug(3, "Destroying worker thread %d\n", worker->id);
worker_shutdown(worker);
ast_mutex_destroy(&worker->lock);
ast_cond_destroy(&worker->cond);
}
/*!
* \brief start point for worker threads
*
* Worker threads start in the active state but may
* immediately go idle if there is no work to be
* done
*
* \param arg The worker thread
*/
static void *worker_start(void *arg)
{
struct worker_thread *worker = arg;
enum worker_state saved_state;
if (worker->options.thread_start) {
worker->options.thread_start();
}
ast_mutex_lock(&worker->lock);
while (worker_idle(worker)) {
ast_mutex_unlock(&worker->lock);
worker_active(worker);
ast_mutex_lock(&worker->lock);
if (worker->state != ALIVE) {
break;
}
threadpool_active_thread_idle(worker->pool, worker);
}
saved_state = worker->state;
ast_mutex_unlock(&worker->lock);
/* Reaching this portion means the thread is
* on death's door. It may have been killed while
* it was idle, in which case it can just die
* peacefully. If it's a zombie, though, then
* it needs to let the pool know so
* that the thread can be removed from the
* list of zombie threads.
*/
if (saved_state == ZOMBIE) {
threadpool_zombie_thread_dead(worker->pool, worker);
}
if (worker->options.thread_end) {
worker->options.thread_end();
}
return NULL;
}
/*!
* \brief Allocate and initialize a new worker thread
*
* This will create, initialize, and start the thread.
*
* \param pool The threadpool to which the worker will be added
* \retval NULL Failed to allocate or start the worker thread
* \retval non-NULL The newly-created worker thread
*/
static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
{
struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
if (!worker) {
return NULL;
}
worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
ast_mutex_init(&worker->lock);
ast_cond_init(&worker->cond, NULL);
worker->pool = pool;
worker->thread = AST_PTHREADT_NULL;
worker->state = ALIVE;
worker->options = pool->options;
return worker;
}
static int worker_thread_start(struct worker_thread *worker)
{
return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
}
/*!
* \brief Active loop for worker threads
*
* The worker will stay in this loop for its lifetime,
* executing tasks as they become available. If there
* are no tasks currently available, then the thread
* will go idle.
*
* \param worker The worker thread executing tasks.
*/
static void worker_active(struct worker_thread *worker)
{
int alive;
/* The following is equivalent to
*
* while (threadpool_execute(worker->pool));
*
* However, reviewers have suggested in the past
* doing that can cause optimizers to (wrongly)
* optimize the code away.
*/
do {
alive = threadpool_execute(worker->pool);
} while (alive);
}
/*!
* \brief Idle function for worker threads
*
* The worker waits here until it gets told by the threadpool
* to wake up.
*
* worker is locked before entering this function.
*
* \param worker The idle worker
* \retval 0 The thread is being woken up so that it can conclude.
* \retval non-zero The thread is being woken up to do more work.
*/
static int worker_idle(struct worker_thread *worker)
{
struct timeval start = ast_tvnow();
struct timespec end = {
.tv_sec = start.tv_sec + worker->options.idle_timeout,
.tv_nsec = start.tv_usec * 1000,
};
while (!worker->wake_up) {
if (worker->options.idle_timeout <= 0) {
ast_cond_wait(&worker->cond, &worker->lock);
} else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) {
break;
}
}
if (!worker->wake_up) {
ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
threadpool_idle_thread_dead(worker->pool, worker);
worker->state = DEAD;
}
worker->wake_up = 0;
return worker->state == ALIVE;
}
/*!
* \brief Change a worker's state
*
* The threadpool calls into this function in order to let a worker know
* how it should proceed.
*
* \retval -1 failure (state transition not permitted)
* \retval 0 success
*/
static int worker_set_state(struct worker_thread *worker, enum worker_state state)
{
SCOPED_MUTEX(lock, &worker->lock);
switch (state) {
case ALIVE:
/* This can occur due to a race condition between being told to go active
* and an idle timeout happening.
*/
if (worker->state == DEAD) {
return -1;
}
ast_assert(worker->state != ZOMBIE);
break;
case DEAD:
break;
case ZOMBIE:
ast_assert(worker->state != DEAD);
break;
}
worker->state = state;
worker->wake_up = 1;
ast_cond_signal(&worker->cond);
return 0;
}
/*! Serializer group shutdown control object. */
struct ast_serializer_shutdown_group {
/*! Shutdown thread waits on this conditional. */
ast_cond_t cond;
/*! Count of serializers needing to shutdown. */
int count;
};
static void serializer_shutdown_group_dtor(void *vdoomed)
{
struct ast_serializer_shutdown_group *doomed = vdoomed;
ast_cond_destroy(&doomed->cond);
}
struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void)
{
struct ast_serializer_shutdown_group *shutdown_group;
shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor);
if (!shutdown_group) {
return NULL;
}
ast_cond_init(&shutdown_group->cond, NULL);
return shutdown_group;
}
int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
{
int remaining;
ast_mutex_t *lock;
if (!shutdown_group) {
return 0;
}
lock = ao2_object_get_lockaddr(shutdown_group);
ast_assert(lock != NULL);
ao2_lock(shutdown_group);
if (timeout) {
struct timeval start;
struct timespec end;
start = ast_tvnow();
end.tv_sec = start.tv_sec + timeout;
end.tv_nsec = start.tv_usec * 1000;
while (shutdown_group->count) {
if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) {
/* Error or timed out waiting for the count to reach zero. */
break;
}
}
} else {
while (shutdown_group->count) {
if (ast_cond_wait(&shutdown_group->cond, lock)) {
/* Error */
break;
}
}
}
remaining = shutdown_group->count;
ao2_unlock(shutdown_group);
return remaining;
}
/*!
* \internal
* \brief Increment the number of serializer members in the group.
* \since 13.5.0
*
* \param shutdown_group Group shutdown controller.
*/
static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
{
ao2_lock(shutdown_group);
++shutdown_group->count;
ao2_unlock(shutdown_group);
}
/*!
* \internal
* \brief Decrement the number of serializer members in the group.
* \since 13.5.0
*
* \param shutdown_group Group shutdown controller.
*/
static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
{
ao2_lock(shutdown_group);
--shutdown_group->count;
if (!shutdown_group->count) {
ast_cond_signal(&shutdown_group->cond);
}
ao2_unlock(shutdown_group);
}
struct serializer {
/*! Threadpool the serializer will use to process the jobs. */
struct ast_threadpool *pool;
/*! Which group will wait for this serializer to shutdown. */
struct ast_serializer_shutdown_group *shutdown_group;
};
static void serializer_dtor(void *obj)
{
struct serializer *ser = obj;
ao2_cleanup(ser->pool);
ser->pool = NULL;
ao2_cleanup(ser->shutdown_group);
ser->shutdown_group = NULL;
}
static struct serializer *serializer_create(struct ast_threadpool *pool,
struct ast_serializer_shutdown_group *shutdown_group)
{
struct serializer *ser;
ser = ao2_alloc_options(sizeof(*ser), serializer_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!ser) {
return NULL;
}
ao2_ref(pool, +1);
ser->pool = pool;
ser->shutdown_group = ao2_bump(shutdown_group);
return ser;
}
AST_THREADSTORAGE_RAW(current_serializer);
static int execute_tasks(void *data)
{
struct ast_taskprocessor *tps = data;
ast_threadstorage_set_ptr(&current_serializer, tps);
while (ast_taskprocessor_execute(tps)) {
/* No-op */
}
ast_threadstorage_set_ptr(&current_serializer, NULL);
ast_taskprocessor_unreference(tps);
return 0;
}
static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
{
if (was_empty) {
struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener);
if (ast_threadpool_push(ser->pool, execute_tasks, tps)) {
ast_taskprocessor_unreference(tps);
}
}
}
static int serializer_start(struct ast_taskprocessor_listener *listener)
{
/* No-op */
return 0;
}
static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
{
struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
if (ser->shutdown_group) {
serializer_shutdown_group_dec(ser->shutdown_group);
}
ao2_cleanup(ser);
}
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = {
.task_pushed = serializer_task_pushed,
.start = serializer_start,
.shutdown = serializer_shutdown,
};
struct ast_taskprocessor *ast_threadpool_serializer_get_current(void)
{
return ast_threadstorage_get_ptr(&current_serializer);
}
struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name,
struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
{
struct serializer *ser;
struct ast_taskprocessor_listener *listener;
struct ast_taskprocessor *tps;
ser = serializer_create(pool, shutdown_group);
if (!ser) {
return NULL;
}
listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
if (!listener) {
ao2_ref(ser, -1);
return NULL;
}
tps = ast_taskprocessor_create_with_listener(name, listener);
if (!tps) {
/* ser ref transferred to listener but not cleaned without tps */
ao2_ref(ser, -1);
} else if (shutdown_group) {
serializer_shutdown_group_inc(shutdown_group);
}
ao2_ref(listener, -1);
return tps;
}
struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
{
return ast_threadpool_serializer_group(name, pool, NULL);
}
long ast_threadpool_queue_size(struct ast_threadpool *pool)
{
return ast_taskprocessor_size(pool->tps);
}