514 lines
13 KiB
C
514 lines
13 KiB
C
|
/*
|
||
|
* Asterisk -- An open source telephony toolkit.
|
||
|
*
|
||
|
* Copyright (C) 2013, Digium, Inc.
|
||
|
*
|
||
|
* David M. Lee, II <dlee@digium.com>
|
||
|
*
|
||
|
* See http://www.asterisk.org for more information about
|
||
|
* the Asterisk project. Please do not directly contact
|
||
|
* any of the maintainers of this project for assistance;
|
||
|
* the project provides a web site, mailing lists and IRC
|
||
|
* channels for your use.
|
||
|
*
|
||
|
* This program is free software, distributed under the terms of
|
||
|
* the GNU General Public License Version 2. See the LICENSE file
|
||
|
* at the top of the source tree.
|
||
|
*/
|
||
|
|
||
|
/*! \file
|
||
|
*
|
||
|
* \brief Asterisk endpoint API.
|
||
|
*
|
||
|
* \author David M. Lee, II <dlee@digium.com>
|
||
|
*/
|
||
|
|
||
|
/*** MODULEINFO
|
||
|
<support_level>core</support_level>
|
||
|
***/
|
||
|
|
||
|
#include "asterisk.h"
|
||
|
|
||
|
#include "asterisk/astobj2.h"
|
||
|
#include "asterisk/endpoints.h"
|
||
|
#include "asterisk/stasis.h"
|
||
|
#include "asterisk/stasis_channels.h"
|
||
|
#include "asterisk/stasis_endpoints.h"
|
||
|
#include "asterisk/stasis_message_router.h"
|
||
|
#include "asterisk/stringfields.h"
|
||
|
#include "asterisk/_private.h"
|
||
|
|
||
|
/*! Buckets for endpoint->channel mappings. Keep it prime! */
|
||
|
#define ENDPOINT_CHANNEL_BUCKETS 127
|
||
|
|
||
|
/*! Buckets for endpoint hash. Keep it prime! */
|
||
|
#define ENDPOINT_BUCKETS 127
|
||
|
|
||
|
/*! Buckets for technology endpoints. */
|
||
|
#define TECH_ENDPOINT_BUCKETS 11
|
||
|
|
||
|
static struct ao2_container *endpoints;
|
||
|
|
||
|
static struct ao2_container *tech_endpoints;
|
||
|
|
||
|
struct ast_endpoint {
|
||
|
AST_DECLARE_STRING_FIELDS(
|
||
|
AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */
|
||
|
AST_STRING_FIELD(resource); /*!< Name, unique to the tech. */
|
||
|
AST_STRING_FIELD(id); /*!< tech/resource id */
|
||
|
);
|
||
|
/*! Endpoint's current state */
|
||
|
enum ast_endpoint_state state;
|
||
|
/*!
|
||
|
* \brief Max channels for this endpoint. -1 means unlimited or unknown.
|
||
|
*
|
||
|
* Note that this simply documents the limits of an endpoint, and does
|
||
|
* nothing to try to enforce the limit.
|
||
|
*/
|
||
|
int max_channels;
|
||
|
/*! Topic for this endpoint's messages */
|
||
|
struct stasis_cp_single *topics;
|
||
|
/*! Router for handling this endpoint's messages */
|
||
|
struct stasis_message_router *router;
|
||
|
/*! ast_str_container of channels associated with this endpoint */
|
||
|
struct ao2_container *channel_ids;
|
||
|
/*! Forwarding subscription from an endpoint to its tech endpoint */
|
||
|
struct stasis_forward *tech_forward;
|
||
|
};
|
||
|
|
||
|
AO2_STRING_FIELD_HASH_FN(ast_endpoint, id)
|
||
|
AO2_STRING_FIELD_CMP_FN(ast_endpoint, id)
|
||
|
|
||
|
struct ast_endpoint *ast_endpoint_find_by_id(const char *id)
|
||
|
{
|
||
|
struct ast_endpoint *endpoint = ao2_find(endpoints, id, OBJ_KEY);
|
||
|
|
||
|
if (!endpoint) {
|
||
|
endpoint = ao2_find(tech_endpoints, id, OBJ_KEY);
|
||
|
}
|
||
|
|
||
|
return endpoint;
|
||
|
}
|
||
|
|
||
|
struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
|
||
|
{
|
||
|
if (!endpoint) {
|
||
|
return ast_endpoint_topic_all();
|
||
|
}
|
||
|
return stasis_cp_single_topic(endpoint->topics);
|
||
|
}
|
||
|
|
||
|
struct stasis_topic *ast_endpoint_topic_cached(struct ast_endpoint *endpoint)
|
||
|
{
|
||
|
if (!endpoint) {
|
||
|
return ast_endpoint_topic_all_cached();
|
||
|
}
|
||
|
return stasis_cp_single_topic_cached(endpoint->topics);
|
||
|
}
|
||
|
|
||
|
const char *ast_endpoint_state_to_string(enum ast_endpoint_state state)
|
||
|
{
|
||
|
switch (state) {
|
||
|
case AST_ENDPOINT_UNKNOWN:
|
||
|
return "unknown";
|
||
|
case AST_ENDPOINT_OFFLINE:
|
||
|
return "offline";
|
||
|
case AST_ENDPOINT_ONLINE:
|
||
|
return "online";
|
||
|
}
|
||
|
return "?";
|
||
|
}
|
||
|
|
||
|
static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
|
||
|
{
|
||
|
RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
|
||
|
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
|
||
|
|
||
|
ast_assert(endpoint != NULL);
|
||
|
ast_assert(endpoint->topics != NULL);
|
||
|
|
||
|
if (!ast_endpoint_snapshot_type()) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
snapshot = ast_endpoint_snapshot_create(endpoint);
|
||
|
if (!snapshot) {
|
||
|
return;
|
||
|
}
|
||
|
message = stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
|
||
|
if (!message) {
|
||
|
return;
|
||
|
}
|
||
|
stasis_publish(ast_endpoint_topic(endpoint), message);
|
||
|
}
|
||
|
|
||
|
static void endpoint_dtor(void *obj)
|
||
|
{
|
||
|
struct ast_endpoint *endpoint = obj;
|
||
|
|
||
|
/* The router should be shut down already */
|
||
|
ast_assert(stasis_message_router_is_done(endpoint->router));
|
||
|
ao2_cleanup(endpoint->router);
|
||
|
endpoint->router = NULL;
|
||
|
|
||
|
stasis_cp_single_unsubscribe(endpoint->topics);
|
||
|
endpoint->topics = NULL;
|
||
|
|
||
|
ao2_cleanup(endpoint->channel_ids);
|
||
|
endpoint->channel_ids = NULL;
|
||
|
|
||
|
ast_string_field_free_memory(endpoint);
|
||
|
}
|
||
|
|
||
|
|
||
|
int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
|
||
|
struct ast_channel *chan)
|
||
|
{
|
||
|
ast_assert(chan != NULL);
|
||
|
ast_assert(endpoint != NULL);
|
||
|
ast_assert(!ast_strlen_zero(endpoint->resource));
|
||
|
|
||
|
ast_channel_forward_endpoint(chan, endpoint);
|
||
|
|
||
|
ao2_lock(endpoint);
|
||
|
ast_str_container_add(endpoint->channel_ids, ast_channel_uniqueid(chan));
|
||
|
ao2_unlock(endpoint);
|
||
|
|
||
|
endpoint_publish_snapshot(endpoint);
|
||
|
|
||
|
return 0;
|
||
|
}
|
||
|
|
||
|
/*! \brief Handler for channel snapshot update */
|
||
|
static void endpoint_cache_clear(void *data,
|
||
|
struct stasis_subscription *sub,
|
||
|
struct stasis_message *message)
|
||
|
{
|
||
|
struct ast_endpoint *endpoint = data;
|
||
|
struct ast_channel_snapshot_update *update = stasis_message_data(message);
|
||
|
|
||
|
/* Only when the channel is dead do we remove it */
|
||
|
if (!ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
ast_assert(endpoint != NULL);
|
||
|
|
||
|
ao2_lock(endpoint);
|
||
|
ast_str_container_remove(endpoint->channel_ids, update->new_snapshot->base->uniqueid);
|
||
|
ao2_unlock(endpoint);
|
||
|
endpoint_publish_snapshot(endpoint);
|
||
|
}
|
||
|
|
||
|
static void endpoint_subscription_change(void *data,
|
||
|
struct stasis_subscription *sub,
|
||
|
struct stasis_message *message)
|
||
|
{
|
||
|
struct stasis_endpoint *endpoint = data;
|
||
|
|
||
|
if (stasis_subscription_final_message(sub, message)) {
|
||
|
ao2_cleanup(endpoint);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
static struct ast_endpoint *endpoint_internal_create(const char *tech, const char *resource)
|
||
|
{
|
||
|
RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
|
||
|
RAII_VAR(struct ast_endpoint *, tech_endpoint, NULL, ao2_cleanup);
|
||
|
int r = 0;
|
||
|
|
||
|
/* Get/create the technology endpoint */
|
||
|
if (!ast_strlen_zero(resource)) {
|
||
|
tech_endpoint = ao2_find(tech_endpoints, tech, OBJ_KEY);
|
||
|
if (!tech_endpoint) {
|
||
|
tech_endpoint = endpoint_internal_create(tech, NULL);
|
||
|
if (!tech_endpoint) {
|
||
|
return NULL;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
endpoint = ao2_alloc(sizeof(*endpoint), endpoint_dtor);
|
||
|
if (!endpoint) {
|
||
|
return NULL;
|
||
|
}
|
||
|
|
||
|
endpoint->max_channels = -1;
|
||
|
endpoint->state = AST_ENDPOINT_UNKNOWN;
|
||
|
|
||
|
if (ast_string_field_init(endpoint, 80) != 0) {
|
||
|
return NULL;
|
||
|
}
|
||
|
ast_string_field_set(endpoint, tech, tech);
|
||
|
ast_string_field_set(endpoint, resource, S_OR(resource, ""));
|
||
|
ast_string_field_build(endpoint, id, "%s%s%s",
|
||
|
tech,
|
||
|
!ast_strlen_zero(resource) ? "/" : "",
|
||
|
S_OR(resource, ""));
|
||
|
|
||
|
/* All access to channel_ids should be covered by the endpoint's
|
||
|
* lock; no extra lock needed. */
|
||
|
endpoint->channel_ids = ast_str_container_alloc_options(
|
||
|
AO2_ALLOC_OPT_LOCK_NOLOCK, ENDPOINT_CHANNEL_BUCKETS);
|
||
|
if (!endpoint->channel_ids) {
|
||
|
return NULL;
|
||
|
}
|
||
|
|
||
|
if (!ast_strlen_zero(resource)) {
|
||
|
char *topic_name;
|
||
|
int ret;
|
||
|
|
||
|
ret = ast_asprintf(&topic_name, "endpoint:%s", endpoint->id);
|
||
|
if (ret < 0) {
|
||
|
return NULL;
|
||
|
}
|
||
|
|
||
|
endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
|
||
|
topic_name);
|
||
|
ast_free(topic_name);
|
||
|
if (!endpoint->topics) {
|
||
|
return NULL;
|
||
|
}
|
||
|
stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
|
||
|
stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
|
||
|
|
||
|
endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
|
||
|
if (!endpoint->router) {
|
||
|
return NULL;
|
||
|
}
|
||
|
r |= stasis_message_router_add(endpoint->router,
|
||
|
ast_channel_snapshot_type(), endpoint_cache_clear,
|
||
|
endpoint);
|
||
|
r |= stasis_message_router_add(endpoint->router,
|
||
|
stasis_subscription_change_type(), endpoint_subscription_change,
|
||
|
endpoint);
|
||
|
if (r) {
|
||
|
return NULL;
|
||
|
}
|
||
|
|
||
|
endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics),
|
||
|
stasis_cp_single_topic(tech_endpoint->topics));
|
||
|
|
||
|
endpoint_publish_snapshot(endpoint);
|
||
|
ao2_link(endpoints, endpoint);
|
||
|
} else {
|
||
|
char *topic_name;
|
||
|
int ret;
|
||
|
|
||
|
ret = ast_asprintf(&topic_name, "endpoint:%s", endpoint->id);
|
||
|
if (ret < 0) {
|
||
|
return NULL;
|
||
|
}
|
||
|
|
||
|
endpoint->topics = stasis_cp_sink_create(ast_endpoint_cache_all(),
|
||
|
topic_name);
|
||
|
ast_free(topic_name);
|
||
|
if (!endpoint->topics) {
|
||
|
return NULL;
|
||
|
}
|
||
|
stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
|
||
|
stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
|
||
|
|
||
|
ao2_link(tech_endpoints, endpoint);
|
||
|
}
|
||
|
|
||
|
ao2_ref(endpoint, +1);
|
||
|
return endpoint;
|
||
|
}
|
||
|
|
||
|
struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
|
||
|
{
|
||
|
if (ast_strlen_zero(tech)) {
|
||
|
ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n");
|
||
|
return NULL;
|
||
|
}
|
||
|
|
||
|
if (ast_strlen_zero(resource)) {
|
||
|
ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n");
|
||
|
return NULL;
|
||
|
}
|
||
|
|
||
|
return endpoint_internal_create(tech, resource);
|
||
|
}
|
||
|
|
||
|
static struct stasis_message *create_endpoint_snapshot_message(struct ast_endpoint *endpoint)
|
||
|
{
|
||
|
RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
|
||
|
|
||
|
if (!ast_endpoint_snapshot_type()) {
|
||
|
return NULL;
|
||
|
}
|
||
|
|
||
|
snapshot = ast_endpoint_snapshot_create(endpoint);
|
||
|
if (!snapshot) {
|
||
|
return NULL;
|
||
|
}
|
||
|
|
||
|
return stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
|
||
|
}
|
||
|
|
||
|
void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
|
||
|
{
|
||
|
RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
|
||
|
|
||
|
if (endpoint == NULL) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
ao2_unlink(endpoints, endpoint);
|
||
|
endpoint->tech_forward = stasis_forward_cancel(endpoint->tech_forward);
|
||
|
|
||
|
clear_msg = create_endpoint_snapshot_message(endpoint);
|
||
|
if (clear_msg) {
|
||
|
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
|
||
|
message = stasis_cache_clear_create(clear_msg);
|
||
|
if (message) {
|
||
|
stasis_publish(ast_endpoint_topic(endpoint), message);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/* Bump refcount to hold on to the router */
|
||
|
ao2_ref(endpoint->router, +1);
|
||
|
stasis_message_router_unsubscribe(endpoint->router);
|
||
|
}
|
||
|
|
||
|
const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
|
||
|
{
|
||
|
if (!endpoint) {
|
||
|
return NULL;
|
||
|
}
|
||
|
return endpoint->tech;
|
||
|
}
|
||
|
|
||
|
const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
|
||
|
{
|
||
|
if (!endpoint) {
|
||
|
return NULL;
|
||
|
}
|
||
|
return endpoint->resource;
|
||
|
}
|
||
|
|
||
|
const char *ast_endpoint_get_id(const struct ast_endpoint *endpoint)
|
||
|
{
|
||
|
if (!endpoint) {
|
||
|
return NULL;
|
||
|
}
|
||
|
return endpoint->id;
|
||
|
}
|
||
|
|
||
|
enum ast_endpoint_state ast_endpoint_get_state(const struct ast_endpoint *endpoint)
|
||
|
{
|
||
|
if (!endpoint) {
|
||
|
return AST_ENDPOINT_UNKNOWN;
|
||
|
}
|
||
|
return endpoint->state;
|
||
|
}
|
||
|
|
||
|
void ast_endpoint_set_state(struct ast_endpoint *endpoint,
|
||
|
enum ast_endpoint_state state)
|
||
|
{
|
||
|
ast_assert(endpoint != NULL);
|
||
|
ast_assert(!ast_strlen_zero(endpoint->resource));
|
||
|
|
||
|
ao2_lock(endpoint);
|
||
|
endpoint->state = state;
|
||
|
ao2_unlock(endpoint);
|
||
|
endpoint_publish_snapshot(endpoint);
|
||
|
}
|
||
|
|
||
|
void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint,
|
||
|
int max_channels)
|
||
|
{
|
||
|
ast_assert(endpoint != NULL);
|
||
|
ast_assert(!ast_strlen_zero(endpoint->resource));
|
||
|
|
||
|
ao2_lock(endpoint);
|
||
|
endpoint->max_channels = max_channels;
|
||
|
ao2_unlock(endpoint);
|
||
|
endpoint_publish_snapshot(endpoint);
|
||
|
}
|
||
|
|
||
|
static void endpoint_snapshot_dtor(void *obj)
|
||
|
{
|
||
|
struct ast_endpoint_snapshot *snapshot = obj;
|
||
|
int channel;
|
||
|
|
||
|
ast_assert(snapshot != NULL);
|
||
|
|
||
|
for (channel = 0; channel < snapshot->num_channels; channel++) {
|
||
|
ao2_ref(snapshot->channel_ids[channel], -1);
|
||
|
}
|
||
|
|
||
|
ast_string_field_free_memory(snapshot);
|
||
|
}
|
||
|
|
||
|
struct ast_endpoint_snapshot *ast_endpoint_snapshot_create(
|
||
|
struct ast_endpoint *endpoint)
|
||
|
{
|
||
|
struct ast_endpoint_snapshot *snapshot;
|
||
|
int channel_count;
|
||
|
struct ao2_iterator i;
|
||
|
void *obj;
|
||
|
SCOPED_AO2LOCK(lock, endpoint);
|
||
|
|
||
|
ast_assert(endpoint != NULL);
|
||
|
ast_assert(!ast_strlen_zero(endpoint->resource));
|
||
|
|
||
|
channel_count = ao2_container_count(endpoint->channel_ids);
|
||
|
|
||
|
snapshot = ao2_alloc_options(
|
||
|
sizeof(*snapshot) + channel_count * sizeof(char *),
|
||
|
endpoint_snapshot_dtor,
|
||
|
AO2_ALLOC_OPT_LOCK_NOLOCK);
|
||
|
|
||
|
if (!snapshot || ast_string_field_init(snapshot, 80) != 0) {
|
||
|
ao2_cleanup(snapshot);
|
||
|
return NULL;
|
||
|
}
|
||
|
|
||
|
ast_string_field_build(snapshot, id, "%s/%s", endpoint->tech,
|
||
|
endpoint->resource);
|
||
|
ast_string_field_set(snapshot, tech, endpoint->tech);
|
||
|
ast_string_field_set(snapshot, resource, endpoint->resource);
|
||
|
|
||
|
snapshot->state = endpoint->state;
|
||
|
snapshot->max_channels = endpoint->max_channels;
|
||
|
|
||
|
i = ao2_iterator_init(endpoint->channel_ids, 0);
|
||
|
while ((obj = ao2_iterator_next(&i))) {
|
||
|
/* The reference is kept so the channel id does not go away until the snapshot is gone */
|
||
|
snapshot->channel_ids[snapshot->num_channels++] = obj;
|
||
|
}
|
||
|
ao2_iterator_destroy(&i);
|
||
|
|
||
|
return snapshot;
|
||
|
}
|
||
|
|
||
|
static void endpoint_cleanup(void)
|
||
|
{
|
||
|
ao2_cleanup(endpoints);
|
||
|
endpoints = NULL;
|
||
|
|
||
|
ao2_cleanup(tech_endpoints);
|
||
|
tech_endpoints = NULL;
|
||
|
}
|
||
|
|
||
|
int ast_endpoint_init(void)
|
||
|
{
|
||
|
ast_register_cleanup(endpoint_cleanup);
|
||
|
|
||
|
endpoints = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, ENDPOINT_BUCKETS,
|
||
|
ast_endpoint_hash_fn, NULL, ast_endpoint_cmp_fn);
|
||
|
if (!endpoints) {
|
||
|
return -1;
|
||
|
}
|
||
|
|
||
|
tech_endpoints = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
|
||
|
TECH_ENDPOINT_BUCKETS, ast_endpoint_hash_fn, NULL, ast_endpoint_cmp_fn);
|
||
|
if (!tech_endpoints) {
|
||
|
return -1;
|
||
|
}
|
||
|
|
||
|
return 0;
|
||
|
}
|