274 lines
6.3 KiB
C
274 lines
6.3 KiB
C
|
/*
|
||
|
* Asterisk -- An open source telephony toolkit.
|
||
|
*
|
||
|
* Copyright (C) 2013, Digium, Inc.
|
||
|
*
|
||
|
* David M. Lee, II <dlee@digium.com>
|
||
|
*
|
||
|
* See http://www.asterisk.org for more information about
|
||
|
* the Asterisk project. Please do not directly contact
|
||
|
* any of the maintainers of this project for assistance;
|
||
|
* the project provides a web site, mailing lists and IRC
|
||
|
* channels for your use.
|
||
|
*
|
||
|
* This program is free software, distributed under the terms of
|
||
|
* the GNU General Public License Version 2. See the LICENSE file
|
||
|
* at the top of the source tree.
|
||
|
*/
|
||
|
|
||
|
/*!
|
||
|
* \file
|
||
|
* \brief Test infrastructure for dealing with Stasis.
|
||
|
*
|
||
|
* \author David M. Lee, II <dlee@digium.com>
|
||
|
*/
|
||
|
|
||
|
/*** MODULEINFO
|
||
|
<depend>TEST_FRAMEWORK</depend>
|
||
|
<support_level>core</support_level>
|
||
|
***/
|
||
|
|
||
|
#include "asterisk.h"
|
||
|
|
||
|
#include "asterisk/astobj2.h"
|
||
|
#include "asterisk/module.h"
|
||
|
#include "asterisk/stasis_test.h"
|
||
|
|
||
|
STASIS_MESSAGE_TYPE_DEFN(stasis_test_message_type);
|
||
|
|
||
|
static void stasis_message_sink_dtor(void *obj)
|
||
|
{
|
||
|
struct stasis_message_sink *sink = obj;
|
||
|
|
||
|
{
|
||
|
SCOPED_MUTEX(lock, &sink->lock);
|
||
|
while (!sink->is_done) {
|
||
|
/* Normally waiting forever is bad, but if we're not
|
||
|
* done, we're not done. */
|
||
|
ast_cond_wait(&sink->cond, &sink->lock);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
ast_mutex_destroy(&sink->lock);
|
||
|
ast_cond_destroy(&sink->cond);
|
||
|
|
||
|
while (sink->num_messages > 0) {
|
||
|
ao2_cleanup(sink->messages[--sink->num_messages]);
|
||
|
}
|
||
|
ast_free(sink->messages);
|
||
|
sink->messages = NULL;
|
||
|
sink->max_messages = 0;
|
||
|
}
|
||
|
|
||
|
static struct timespec make_deadline(int timeout_millis)
|
||
|
{
|
||
|
struct timeval start = ast_tvnow();
|
||
|
struct timeval delta = {
|
||
|
.tv_sec = timeout_millis / 1000,
|
||
|
.tv_usec = (timeout_millis % 1000) * 1000,
|
||
|
};
|
||
|
struct timeval deadline_tv = ast_tvadd(start, delta);
|
||
|
struct timespec deadline = {
|
||
|
.tv_sec = deadline_tv.tv_sec,
|
||
|
.tv_nsec = 1000 * deadline_tv.tv_usec,
|
||
|
};
|
||
|
|
||
|
return deadline;
|
||
|
}
|
||
|
|
||
|
struct stasis_message_sink *stasis_message_sink_create(void)
|
||
|
{
|
||
|
RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
|
||
|
|
||
|
sink = ao2_alloc(sizeof(*sink), stasis_message_sink_dtor);
|
||
|
if (!sink) {
|
||
|
return NULL;
|
||
|
}
|
||
|
ast_mutex_init(&sink->lock);
|
||
|
ast_cond_init(&sink->cond, NULL);
|
||
|
sink->max_messages = 4;
|
||
|
sink->messages =
|
||
|
ast_malloc(sizeof(*sink->messages) * sink->max_messages);
|
||
|
if (!sink->messages) {
|
||
|
return NULL;
|
||
|
}
|
||
|
ao2_ref(sink, +1);
|
||
|
return sink;
|
||
|
}
|
||
|
|
||
|
/*!
|
||
|
* \brief Implementation of the stasis_message_sink_cb() callback.
|
||
|
*
|
||
|
* Why the roundabout way of exposing this via stasis_message_sink_cb()? Well,
|
||
|
* it has to do with how we previously loaded modules, using \c RTLD_LAZY.
|
||
|
*
|
||
|
* The stasis_message_sink_cb() function gave us a layer of indirection so that
|
||
|
* the initial lazy binding would still work as expected.
|
||
|
*/
|
||
|
static void message_sink_cb(void *data, struct stasis_subscription *sub,
|
||
|
struct stasis_message *message)
|
||
|
{
|
||
|
struct stasis_message_sink *sink = data;
|
||
|
|
||
|
SCOPED_MUTEX(lock, &sink->lock);
|
||
|
|
||
|
if (stasis_subscription_final_message(sub, message)) {
|
||
|
sink->is_done = 1;
|
||
|
ast_cond_signal(&sink->cond);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
if (stasis_subscription_change_type() == stasis_message_type(message)) {
|
||
|
/* Ignore subscription changes */
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
if (sink->num_messages == sink->max_messages) {
|
||
|
size_t new_max_messages = sink->max_messages * 2;
|
||
|
struct stasis_message **new_messages = ast_realloc(
|
||
|
sink->messages,
|
||
|
sizeof(*new_messages) * new_max_messages);
|
||
|
if (!new_messages) {
|
||
|
return;
|
||
|
}
|
||
|
sink->max_messages = new_max_messages;
|
||
|
sink->messages = new_messages;
|
||
|
}
|
||
|
|
||
|
ao2_ref(message, +1);
|
||
|
sink->messages[sink->num_messages++] = message;
|
||
|
ast_cond_signal(&sink->cond);
|
||
|
}
|
||
|
|
||
|
stasis_subscription_cb stasis_message_sink_cb(void)
|
||
|
{
|
||
|
return message_sink_cb;
|
||
|
}
|
||
|
|
||
|
|
||
|
int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink,
|
||
|
int num_messages, int timeout_millis)
|
||
|
{
|
||
|
struct timespec deadline = make_deadline(timeout_millis);
|
||
|
|
||
|
SCOPED_MUTEX(lock, &sink->lock);
|
||
|
while (sink->num_messages < num_messages) {
|
||
|
int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
|
||
|
|
||
|
if (r == ETIMEDOUT) {
|
||
|
break;
|
||
|
}
|
||
|
if (r != 0) {
|
||
|
ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
|
||
|
strerror(r));
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
return sink->num_messages;
|
||
|
}
|
||
|
|
||
|
int stasis_message_sink_should_stay(struct stasis_message_sink *sink,
|
||
|
int num_messages, int timeout_millis)
|
||
|
{
|
||
|
struct timespec deadline = make_deadline(timeout_millis);
|
||
|
|
||
|
SCOPED_MUTEX(lock, &sink->lock);
|
||
|
while (sink->num_messages == num_messages) {
|
||
|
int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
|
||
|
|
||
|
if (r == ETIMEDOUT) {
|
||
|
break;
|
||
|
}
|
||
|
if (r != 0) {
|
||
|
ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
|
||
|
strerror(r));
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
return sink->num_messages;
|
||
|
}
|
||
|
|
||
|
int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start,
|
||
|
stasis_wait_cb cmp_cb, const void *data, int timeout_millis)
|
||
|
{
|
||
|
struct timespec deadline = make_deadline(timeout_millis);
|
||
|
|
||
|
SCOPED_MUTEX(lock, &sink->lock);
|
||
|
|
||
|
/* wait for the start */
|
||
|
while (sink->num_messages < start + 1) {
|
||
|
int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
|
||
|
|
||
|
if (r == ETIMEDOUT) {
|
||
|
/* Timed out waiting for the start */
|
||
|
return -1;
|
||
|
}
|
||
|
if (r != 0) {
|
||
|
ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
|
||
|
strerror(r));
|
||
|
return -2;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
while (!cmp_cb(sink->messages[start], data)) {
|
||
|
++start;
|
||
|
|
||
|
while (sink->num_messages < start + 1) {
|
||
|
int r = ast_cond_timedwait(&sink->cond,
|
||
|
&sink->lock, &deadline);
|
||
|
|
||
|
if (r == ETIMEDOUT) {
|
||
|
return -1;
|
||
|
}
|
||
|
if (r != 0) {
|
||
|
ast_log(LOG_ERROR,
|
||
|
"Unexpected condition error: %s\n",
|
||
|
strerror(r));
|
||
|
return -2;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return start;
|
||
|
}
|
||
|
|
||
|
struct stasis_message *stasis_test_message_create(void)
|
||
|
{
|
||
|
RAII_VAR(void *, data, NULL, ao2_cleanup);
|
||
|
|
||
|
if (!stasis_test_message_type()) {
|
||
|
return NULL;
|
||
|
}
|
||
|
|
||
|
/* We just need the unique pointer; don't care what's in it */
|
||
|
data = ao2_alloc(1, NULL);
|
||
|
if (!data) {
|
||
|
return NULL;
|
||
|
}
|
||
|
|
||
|
return stasis_message_create(stasis_test_message_type(), data);
|
||
|
}
|
||
|
|
||
|
static int unload_module(void)
|
||
|
{
|
||
|
STASIS_MESSAGE_TYPE_CLEANUP(stasis_test_message_type);
|
||
|
return 0;
|
||
|
}
|
||
|
|
||
|
static int load_module(void)
|
||
|
{
|
||
|
if (STASIS_MESSAGE_TYPE_INIT(stasis_test_message_type) != 0) {
|
||
|
return AST_MODULE_LOAD_DECLINE;
|
||
|
}
|
||
|
|
||
|
return AST_MODULE_LOAD_SUCCESS;
|
||
|
}
|
||
|
|
||
|
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "Stasis test utilities",
|
||
|
.support_level = AST_MODULE_SUPPORT_CORE,
|
||
|
.load = load_module,
|
||
|
.unload = unload_module,
|
||
|
.load_pri = AST_MODPRI_APP_DEPEND,
|
||
|
);
|