|
|
|
@ -1126,18 +1126,126 @@ static void worker_set_state(struct worker_thread *worker, enum worker_state sta
|
|
|
|
|
ast_cond_signal(&worker->cond);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*! Serializer group shutdown control object. */
|
|
|
|
|
struct ast_serializer_shutdown_group {
|
|
|
|
|
/*! Shutdown thread waits on this conditional. */
|
|
|
|
|
ast_cond_t cond;
|
|
|
|
|
/*! Count of serializers needing to shutdown. */
|
|
|
|
|
int count;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
static void serializer_shutdown_group_dtor(void *vdoomed)
|
|
|
|
|
{
|
|
|
|
|
struct ast_serializer_shutdown_group *doomed = vdoomed;
|
|
|
|
|
|
|
|
|
|
ast_cond_destroy(&doomed->cond);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void)
|
|
|
|
|
{
|
|
|
|
|
struct ast_serializer_shutdown_group *shutdown_group;
|
|
|
|
|
|
|
|
|
|
shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor);
|
|
|
|
|
if (!shutdown_group) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
ast_cond_init(&shutdown_group->cond, NULL);
|
|
|
|
|
return shutdown_group;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
|
|
|
|
|
{
|
|
|
|
|
int remaining;
|
|
|
|
|
ast_mutex_t *lock;
|
|
|
|
|
|
|
|
|
|
if (!shutdown_group) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
lock = ao2_object_get_lockaddr(shutdown_group);
|
|
|
|
|
ast_assert(lock != NULL);
|
|
|
|
|
|
|
|
|
|
ao2_lock(shutdown_group);
|
|
|
|
|
if (timeout) {
|
|
|
|
|
struct timeval start;
|
|
|
|
|
struct timespec end;
|
|
|
|
|
|
|
|
|
|
start = ast_tvnow();
|
|
|
|
|
end.tv_sec = start.tv_sec + timeout;
|
|
|
|
|
end.tv_nsec = start.tv_usec * 1000;
|
|
|
|
|
while (shutdown_group->count) {
|
|
|
|
|
if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) {
|
|
|
|
|
/* Error or timed out waiting for the count to reach zero. */
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
while (shutdown_group->count) {
|
|
|
|
|
if (ast_cond_wait(&shutdown_group->cond, lock)) {
|
|
|
|
|
/* Error */
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
remaining = shutdown_group->count;
|
|
|
|
|
ao2_unlock(shutdown_group);
|
|
|
|
|
return remaining;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
|
* \internal
|
|
|
|
|
* \brief Increment the number of serializer members in the group.
|
|
|
|
|
* \since 13.5.0
|
|
|
|
|
*
|
|
|
|
|
* \param shutdown_group Group shutdown controller.
|
|
|
|
|
*
|
|
|
|
|
* \return Nothing
|
|
|
|
|
*/
|
|
|
|
|
static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
|
|
|
|
|
{
|
|
|
|
|
ao2_lock(shutdown_group);
|
|
|
|
|
++shutdown_group->count;
|
|
|
|
|
ao2_unlock(shutdown_group);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
|
* \internal
|
|
|
|
|
* \brief Decrement the number of serializer members in the group.
|
|
|
|
|
* \since 13.5.0
|
|
|
|
|
*
|
|
|
|
|
* \param shutdown_group Group shutdown controller.
|
|
|
|
|
*
|
|
|
|
|
* \return Nothing
|
|
|
|
|
*/
|
|
|
|
|
static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
|
|
|
|
|
{
|
|
|
|
|
ao2_lock(shutdown_group);
|
|
|
|
|
--shutdown_group->count;
|
|
|
|
|
if (!shutdown_group->count) {
|
|
|
|
|
ast_cond_signal(&shutdown_group->cond);
|
|
|
|
|
}
|
|
|
|
|
ao2_unlock(shutdown_group);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct serializer {
|
|
|
|
|
/*! Threadpool the serializer will use to process the jobs. */
|
|
|
|
|
struct ast_threadpool *pool;
|
|
|
|
|
/*! Which group will wait for this serializer to shutdown. */
|
|
|
|
|
struct ast_serializer_shutdown_group *shutdown_group;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
static void serializer_dtor(void *obj)
|
|
|
|
|
{
|
|
|
|
|
struct serializer *ser = obj;
|
|
|
|
|
|
|
|
|
|
ao2_cleanup(ser->pool);
|
|
|
|
|
ser->pool = NULL;
|
|
|
|
|
ao2_cleanup(ser->shutdown_group);
|
|
|
|
|
ser->shutdown_group = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static struct serializer *serializer_create(struct ast_threadpool *pool)
|
|
|
|
|
static struct serializer *serializer_create(struct ast_threadpool *pool,
|
|
|
|
|
struct ast_serializer_shutdown_group *shutdown_group)
|
|
|
|
|
{
|
|
|
|
|
struct serializer *ser;
|
|
|
|
|
|
|
|
|
@ -1147,6 +1255,7 @@ static struct serializer *serializer_create(struct ast_threadpool *pool)
|
|
|
|
|
}
|
|
|
|
|
ao2_ref(pool, +1);
|
|
|
|
|
ser->pool = pool;
|
|
|
|
|
ser->shutdown_group = ao2_bump(shutdown_group);
|
|
|
|
|
return ser;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1183,6 +1292,10 @@ static int serializer_start(struct ast_taskprocessor_listener *listener)
|
|
|
|
|
static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
|
|
|
|
|
{
|
|
|
|
|
struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
|
|
|
|
|
|
|
|
|
|
if (ser->shutdown_group) {
|
|
|
|
|
serializer_shutdown_group_dec(ser->shutdown_group);
|
|
|
|
|
}
|
|
|
|
|
ao2_cleanup(ser);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1192,27 +1305,35 @@ static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callb
|
|
|
|
|
.shutdown = serializer_shutdown,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
|
|
|
|
|
struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name,
|
|
|
|
|
struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
|
|
|
|
|
{
|
|
|
|
|
RAII_VAR(struct serializer *, ser, NULL, ao2_cleanup);
|
|
|
|
|
RAII_VAR(struct ast_taskprocessor_listener *, listener, NULL, ao2_cleanup);
|
|
|
|
|
struct ast_taskprocessor *tps = NULL;
|
|
|
|
|
struct serializer *ser;
|
|
|
|
|
struct ast_taskprocessor_listener *listener;
|
|
|
|
|
struct ast_taskprocessor *tps;
|
|
|
|
|
|
|
|
|
|
ser = serializer_create(pool);
|
|
|
|
|
ser = serializer_create(pool, shutdown_group);
|
|
|
|
|
if (!ser) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
|
|
|
|
|
if (!listener) {
|
|
|
|
|
ao2_ref(ser, -1);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
ser = NULL; /* ownership transferred to listener */
|
|
|
|
|
/* ser ref transferred to listener */
|
|
|
|
|
|
|
|
|
|
tps = ast_taskprocessor_create_with_listener(name, listener);
|
|
|
|
|
if (!tps) {
|
|
|
|
|
return NULL;
|
|
|
|
|
if (tps && shutdown_group) {
|
|
|
|
|
serializer_shutdown_group_inc(shutdown_group);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ao2_ref(listener, -1);
|
|
|
|
|
return tps;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
|
|
|
|
|
{
|
|
|
|
|
return ast_threadpool_serializer_group(name, pool, NULL);
|
|
|
|
|
}
|
|
|
|
|