--- apps/app_queue.c (revision 1) +++ apps/app_queue.c (working copy) @@ -391,7 +391,7 @@ #define QUEUE_EVENT_VARIABLES 3 struct call_queue { - ast_mutex_t lock; + char name[80]; /*!< Name */ char moh[80]; /*!< Music On Hold class to be used */ char announce[80]; /*!< Announcement to play when call is answered */ @@ -463,6 +463,7 @@ static int set_member_paused(const char *queuename, const char *interface, int paused); static void queue_transfer_fixup(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan); +static void free_members(struct call_queue *q, int all); static char *int2strat(int strategy); @@ -580,6 +581,29 @@ return -1; } +/*! + * \brief removes a call_queue from the list of call_queues + */ +static void remove_queue(struct call_queue *q) +{ + AST_LIST_LOCK(&queues); + if (AST_LIST_REMOVE(&queues, q, list)) { + ao2_ref(q, -1); + } + AST_LIST_UNLOCK(&queues); +} + +static void destroy_queue(void *obj) +{ + struct call_queue *q = obj; + + if (q->members) { + free_members(q, 1); + ao2_ref(q->members, -1); + } + +} + /*! \brief Insert the 'new' entry after the 'prev' entry of queue 'q' */ static inline void insert_entry(struct call_queue *q, struct queue_ent *prev, struct queue_ent *new, int *pos) { @@ -595,6 +619,11 @@ q->head = new; } new->next = cur; + + /* every queue_ent must have a reference to it's parent call_queue, this + * reference does not go away until the end of the queue_ent's life, meaning + * that even when the queue_ent leaves the call_queue this ref must remain. */ + ao2_ref(q, +1); new->parent = q; new->pos = ++(*pos); new->opos = *pos; @@ -618,7 +647,7 @@ struct ao2_iterator mem_iter; enum queue_member_status result = QUEUE_NO_MEMBERS; - ast_mutex_lock(&q->lock); + ao2_lock(q); mem_iter = ao2_iterator_init(q->members, 0); while ((member = ao2_iterator_next(&mem_iter))) { if (max_penalty && (member->penalty > max_penalty)) { @@ -640,14 +669,13 @@ result = QUEUE_NO_REACHABLE_MEMBERS; ao2_ref(member, -1); break; - default: - ast_mutex_unlock(&q->lock); + default: + ao2_unlock(q); ao2_ref(member, -1); return QUEUE_NORMAL; } } - - ast_mutex_unlock(&q->lock); + ao2_unlock(q); return result; } @@ -665,7 +693,7 @@ AST_LIST_LOCK(&queues); AST_LIST_TRAVERSE(&queues, q, list) { - ast_mutex_lock(&q->lock); + ao2_lock(q); mem_iter = ao2_iterator_init(q->members, 0); while ((cur = ao2_iterator_next(&mem_iter))) { char tmp_interface[80]; @@ -702,7 +730,7 @@ } ao2_ref(cur, -1); } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); } AST_LIST_UNLOCK(&queues); @@ -852,8 +880,7 @@ { struct call_queue *q; - if ((q = ast_calloc(1, sizeof(*q)))) { - ast_mutex_init(&q->lock); + if ((q = ao2_alloc(sizeof(*q), destroy_queue))) { ast_copy_string(q->name, queuename, sizeof(q->name)); } return q; @@ -990,7 +1017,7 @@ AST_LIST_LOCK(&queues); AST_LIST_TRAVERSE(&queues, q, list) { - ast_mutex_lock(&q->lock); + ao2_lock(q); mem_iter = ao2_iterator_init(q->members, 0); while ((mem = ao2_iterator_next(&mem_iter))) { if (!strcasecmp(mem->state_interface, interface)) { @@ -1000,7 +1027,7 @@ } ao2_ref(mem, -1); } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); if (ret) break; } @@ -1260,17 +1287,10 @@ } } -static void destroy_queue(struct call_queue *q) -{ - free_members(q, 1); - ast_mutex_destroy(&q->lock); - ao2_ref(q->members, -1); - free(q); -} - /*!\brief Reload a single queue via realtime. \return Return the queue, or NULL if it doesn't exist. \note Should be called with the global qlock locked. */ + static struct call_queue *find_queue_by_name_rt(const char *queuename, struct ast_variable *queue_vars, struct ast_config *member_config) { struct ast_variable *v; @@ -1289,14 +1309,14 @@ /* Static queues override realtime. */ if (q) { - ast_mutex_lock(&q->lock); + ao2_lock(q); if (!q->realtime) { if (q->dead) { - ast_mutex_unlock(&q->lock); + ao2_unlock(q); return NULL; } else { ast_log(LOG_WARNING, "Static queue '%s' already exists. Not loading from realtime\n", q->name); - ast_mutex_unlock(&q->lock); + ao2_unlock(q); return q; } } @@ -1317,11 +1337,10 @@ /* Delete if unused (else will be deleted when last caller leaves). */ if (!q->count) { /* Delete. */ - AST_LIST_REMOVE(&queues, q, list); - ast_mutex_unlock(&q->lock); - destroy_queue(q); + ao2_unlock(q); + remove_queue(q); } else - ast_mutex_unlock(&q->lock); + ao2_unlock(q); } return NULL; } @@ -1330,10 +1349,10 @@ if (!q) { if (!(q = alloc_queue(queuename))) return NULL; - ast_mutex_lock(&q->lock); + ao2_lock(q); clear_queue(q); q->realtime = 1; - + /* manwe & saghul FTW!!*/ /*Before we initialize the queue, we need to set the strategy, so that linear strategy @@ -1405,16 +1424,15 @@ while ((m = ao2_iterator_next(&mem_iter))) { if (m->dead) { ao2_unlink(q->members, m); - ast_mutex_unlock(&q->lock); + ao2_unlock(q); remove_from_interfaces(m->state_interface); - ast_mutex_lock(&q->lock); + ao2_lock(q); q->membercount--; } ao2_ref(m, -1); } + ao2_unlock(q); - ast_mutex_unlock(&q->lock); - return q; } @@ -1453,7 +1471,7 @@ return; } - ast_mutex_lock(&q->lock); + ao2_lock(q); /* Temporarily set realtime members dead so we can detect deleted ones.*/ mem_iter = ao2_iterator_init(q->members, 0); @@ -1476,14 +1494,14 @@ while ((m = ao2_iterator_next(&mem_iter))) { if (m->dead) { ao2_unlink(q->members, m); - ast_mutex_unlock(&q->lock); + ao2_unlock(q); remove_from_interfaces(m->state_interface); - ast_mutex_lock(&q->lock); + ao2_lock(q); q->membercount--; } ao2_ref(m, -1); } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); ast_config_destroy(member_config); } @@ -1550,7 +1568,7 @@ return res; AST_LIST_LOCK(&queues); - ast_mutex_lock(&q->lock); + ao2_lock(q); /* This is our one */ stat = get_member_status(q, qe->max_penalty); @@ -1596,7 +1614,7 @@ if (option_debug) ast_log(LOG_DEBUG, "Queue '%s' Join, Channel '%s', Position '%d'\n", q->name, qe->chan->name, qe->pos ); } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); AST_LIST_UNLOCK(&queues); return res; @@ -1769,11 +1787,10 @@ /* Calculate holdtime using an exponential average */ /* Thanks to SRT for this contribution */ /* 2^2 (4) is the filter coefficient; a higher exponent would give old entries more weight */ - - ast_mutex_lock(&qe->parent->lock); + ao2_lock(qe->parent); oldvalue = qe->parent->holdtime; qe->parent->holdtime = (((oldvalue << 2) - oldvalue) + newholdtime) >> 2; - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); } @@ -1785,7 +1802,7 @@ if (!(q = qe->parent)) return; - ast_mutex_lock(&q->lock); + ao2_lock(q); prev = NULL; for (cur = q->head; cur; cur = cur->next) { @@ -1809,14 +1826,11 @@ prev = cur; } } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); if (q->dead && !q->count) { /* It's dead and nobody is in it, so kill it */ - AST_LIST_LOCK(&queues); - AST_LIST_REMOVE(&queues, q, list); - AST_LIST_UNLOCK(&queues); - destroy_queue(q); + remove_queue(q); } } @@ -1851,7 +1865,7 @@ AST_LIST_TRAVERSE(&queues, q, list) { if (q == rq) /* don't check myself, could deadlock */ continue; - ast_mutex_lock(&q->lock); + ao2_lock(q); if (q->count && q->members) { if ((mem = ao2_find(q->members, member, OBJ_POINTER))) { ast_log(LOG_DEBUG, "Found matching member %s in queue '%s'\n", mem->interface, q->name); @@ -1862,7 +1876,7 @@ ao2_ref(mem, -1); } } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); if (found) break; } @@ -1976,10 +1990,9 @@ tmp->stillgoing = 0; update_status(tmp->member->interface, ast_device_state(tmp->member->state_interface)); - - ast_mutex_lock(&qe->parent->lock); + ao2_lock(qe->parent); qe->parent->rrpos++; - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); qe->linpos++; @@ -2205,7 +2218,7 @@ static void record_abandoned(struct queue_ent *qe) { - ast_mutex_lock(&qe->parent->lock); + ao2_lock(qe->parent); manager_event(EVENT_FLAG_AGENT, "QueueCallerAbandon", "Queue: %s\r\n" "Uniqueid: %s\r\n" @@ -2215,7 +2228,7 @@ qe->parent->name, qe->chan->uniqueid, qe->pos, qe->opos, (int)(time(NULL) - qe->start)); qe->parent->callsabandoned++; - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); } /*! \brief RNA == Ring No Answer. Common code that is executed when we try a queue member and they don't answer. */ @@ -2516,7 +2529,7 @@ } else { /* This needs a lock. How many members are available to be served? */ - ast_mutex_lock(&qe->parent->lock); + ao2_lock(qe->parent); ch = qe->parent->head; @@ -2562,7 +2575,7 @@ res = 0; } - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); } return res; @@ -2654,13 +2667,13 @@ static int update_queue(struct call_queue *q, struct member *member, int callcompletedinsl) { - ast_mutex_lock(&q->lock); + ao2_lock(q); time(&member->lastcall); member->calls++; q->callscompleted++; if (callcompletedinsl) q->callscompletedinsl++; - ast_mutex_unlock(&q->lock); + ao2_unlock(q); return 0; } @@ -2950,7 +2963,7 @@ /* Hold the lock while we setup the outgoing calls */ if (use_weight) AST_LIST_LOCK(&queues); - ast_mutex_lock(&qe->parent->lock); + ao2_lock(qe->parent); if (option_debug) ast_log(LOG_DEBUG, "%s is trying to call a queue member.\n", qe->chan->name); @@ -2967,7 +2980,7 @@ AST_LIST_HEAD(, ast_dialed_interface) *dialed_interfaces; if (!tmp) { ao2_ref(cur, -1); - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); if (use_weight) AST_LIST_UNLOCK(&queues); goto out; @@ -2975,7 +2988,7 @@ if (!datastore) { if (!(datastore = ast_channel_datastore_alloc(&dialed_interface_info, NULL))) { ao2_ref(cur, -1); - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); if (use_weight) AST_LIST_UNLOCK(&queues); free(tmp); @@ -2984,7 +2997,7 @@ datastore->inheritance = DATASTORE_INHERIT_FOREVER; if (!(dialed_interfaces = ast_calloc(1, sizeof(*dialed_interfaces)))) { ao2_ref(cur, -1); - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); if (use_weight) AST_LIST_UNLOCK(&queues); free(tmp); @@ -3021,7 +3034,7 @@ if (strncasecmp(cur->interface, "Local/", 6)) { if (!(di = ast_calloc(1, sizeof(*di) + strlen(cur->interface)))) { ao2_ref(cur, -1); - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); if (use_weight) AST_LIST_UNLOCK(&queues); free(tmp); @@ -3060,7 +3073,7 @@ else to = (qe->parent->timeout) ? qe->parent->timeout * 1000 : -1; ++qe->pending; - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); ring_one(qe, outgoing, &numbusies); if (use_weight) AST_LIST_UNLOCK(&queues); @@ -3076,14 +3089,14 @@ ast_channel_datastore_free(datastore); } ast_channel_unlock(qe->chan); - ast_mutex_lock(&qe->parent->lock); + ao2_lock(qe->parent); if (qe->parent->strategy == QUEUE_STRATEGY_RRMEMORY) { store_next_rr(qe, outgoing); } if (qe->parent->strategy == QUEUE_STRATEGY_LINEAR) { store_next_lin(qe, outgoing); } - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); peer = lpeer ? lpeer->chan : NULL; if (!peer) { qe->pending = 0; @@ -3107,9 +3120,9 @@ /* Update parameters for the queue */ time(&now); recalc_holdtime(qe, (now - qe->start)); - ast_mutex_lock(&qe->parent->lock); + ao2_lock(qe->parent); callcompletedinsl = ((now - qe->start) <= qe->parent->servicelevel); - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); member = lpeer->member; /* Increment the refcount for this member, since we're going to be using it for awhile in here. */ ao2_ref(member, 1); @@ -3325,9 +3338,10 @@ if ((queue_end_bridge = ao2_alloc(sizeof(*queue_end_bridge), NULL))) { queue_end_bridge->q = qe->parent; queue_end_bridge->chan = qe->chan; - bridge_config.end_bridge_callback = end_bridge_callback; - bridge_config.end_bridge_callback_data = queue_end_bridge; - bridge_config.end_bridge_callback_data_fixup = end_bridge_callback_data_fixup; + + bridge_config.end_bridge_callback = end_bridge_callback; + bridge_config.end_bridge_callback_data = queue_end_bridge; + bridge_config.end_bridge_callback_data_fixup = end_bridge_callback_data_fixup; /* Since queue_end_bridge can survive beyond the life of this call to Queue, we need * to make sure to increase the refcount of this queue so it cannot be freed until we * are done with it. We remove this reference in end_bridge_callback. @@ -3485,9 +3499,9 @@ AST_LIST_LOCK(&queues); AST_LIST_TRAVERSE(&queues, q, list) { - ast_mutex_lock(&q->lock); + ao2_lock(q); if (strcmp(q->name, queuename)) { - ast_mutex_unlock(&q->lock); + ao2_unlock(q); continue; } @@ -3496,7 +3510,7 @@ if (!mem->dynamic) { res = RES_NOT_DYNAMIC; ao2_ref(mem, -1); - ast_mutex_unlock(&q->lock); + ao2_unlock(q); break; } q->membercount--; @@ -3516,7 +3530,7 @@ } else { res = RES_EXISTS; } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); break; } @@ -3539,7 +3553,7 @@ AST_LIST_LOCK(&queues); - ast_mutex_lock(&q->lock); + ao2_lock(q); if ((old_member = interface_exists(q, interface)) == NULL) { if ((new_member = create_queue_member(interface, membername, penalty, paused, state_interface))) { add_to_interfaces(new_member->state_interface); @@ -3575,7 +3589,7 @@ ao2_ref(old_member, -1); res = RES_EXISTS; } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); AST_LIST_UNLOCK(&queues); return res; @@ -3594,7 +3608,7 @@ AST_LIST_LOCK(&queues); AST_LIST_TRAVERSE(&queues, q, list) { - ast_mutex_lock(&q->lock); + ao2_lock(q); if (ast_strlen_zero(queuename) || !strcasecmp(q->name, queuename)) { if ((mem = interface_exists(q, interface))) { found++; @@ -3619,7 +3633,7 @@ ao2_ref(mem, -1); } } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); } AST_LIST_UNLOCK(&queues); @@ -3653,10 +3667,10 @@ queue_name = entry->key + strlen(pm_family) + 2; AST_LIST_TRAVERSE(&queues, cur_queue, list) { - ast_mutex_lock(&cur_queue->lock); + ao2_lock(cur_queue); if (!strcmp(queue_name, cur_queue->name)) break; - ast_mutex_unlock(&cur_queue->lock); + ao2_unlock(cur_queue); } if (!cur_queue) @@ -3669,7 +3683,7 @@ ast_db_del(pm_family, queue_name); continue; } else - ast_mutex_unlock(&cur_queue->lock); + ao2_unlock(cur_queue); if (ast_db_get(pm_family, queue_name, queue_data, PM_MAX_LEN)) continue; @@ -4046,7 +4060,7 @@ AST_APP_ARG(agi); ); /* Our queue entry */ - struct queue_ent qe; + struct queue_ent qe = { 0 }; if (ast_strlen_zero(data)) { ast_log(LOG_WARNING, "Queue requires an argument: queuename[|options[|URL[|announceoverride[|timeout[|agi]]]]]\n"); @@ -4059,7 +4073,6 @@ lu = ast_module_user_add(chan); /* Setup our queue entry */ - memset(&qe, 0, sizeof(qe)); qe.start = time(NULL); /* set the expire time based on the supplied timeout; */ @@ -4279,6 +4292,12 @@ set_queue_result(chan, reason); res = 0; } + if (qe.parent) { + /* every queue_ent is given a reference to it's parent call_queue when it joins the queue. + * This ref must be taken away right before the queue_ent is destroyed. In this case + * the queue_ent is about to be returned on the stack */ + ao2_ref(qe.parent, -1); + } ast_module_user_remove(lu); return res; @@ -4302,7 +4321,7 @@ lu = ast_module_user_add(chan); if ((q = load_realtime_queue(data))) { - ast_mutex_lock(&q->lock); + ao2_lock(q); mem_iter = ao2_iterator_init(q->members, 0); while ((m = ao2_iterator_next(&mem_iter))) { /* Count the agents who are logged in and presently answering calls */ @@ -4311,7 +4330,7 @@ } ao2_ref(m, -1); } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); } else ast_log(LOG_WARNING, "queue %s was not found\n", data); @@ -4340,7 +4359,7 @@ AST_LIST_LOCK(&queues); AST_LIST_TRAVERSE(&queues, q, list) { if (!strcasecmp(q->name, data)) { - ast_mutex_lock(&q->lock); + ao2_lock(q); break; } } @@ -4348,7 +4367,7 @@ if (q) { count = q->count; - ast_mutex_unlock(&q->lock); + ao2_unlock(q); } else if ((var = ast_load_realtime("queues", "name", data, NULL))) { /* if the queue is realtime but was not found in memory, this * means that the queue had been deleted from memory since it was @@ -4383,7 +4402,7 @@ AST_LIST_LOCK(&queues); AST_LIST_TRAVERSE(&queues, q, list) { if (!strcasecmp(q->name, data)) { - ast_mutex_lock(&q->lock); + ao2_lock(q); break; } } @@ -4409,7 +4428,7 @@ } ao2_ref(m, -1); } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); } else ast_log(LOG_WARNING, "queue %s was not found\n", data); @@ -4523,12 +4542,12 @@ new = 0; if (q) { if (!new) - ast_mutex_lock(&q->lock); + ao2_lock(q); /* Check if a queue with this name already exists */ if (q->found) { ast_log(LOG_WARNING, "Queue '%s' already defined! Skipping!\n", cat); if (!new) - ast_mutex_unlock(&q->lock); + ao2_unlock(q); continue; } /* manwe & saghul FTW!! */ @@ -4640,7 +4659,7 @@ if (new) { AST_LIST_INSERT_HEAD(&queues, q, list); } else - ast_mutex_unlock(&q->lock); + ao2_unlock(q); } } } @@ -4648,12 +4667,9 @@ AST_LIST_TRAVERSE_SAFE_BEGIN(&queues, q, list) { if (q->dead) { AST_LIST_REMOVE_CURRENT(&queues, list); - if (!q->count) - destroy_queue(q); - else - ast_log(LOG_DEBUG, "XXX Leaking a little memory :( XXX\n"); + ao2_ref(q, -1); } else { - ast_mutex_lock(&q->lock); + ao2_lock(q); mem_iter = ao2_iterator_init(q->members, 0); while ((cur = ao2_iterator_next(&mem_iter))) { if (cur->dynamic) @@ -4661,7 +4677,7 @@ cur->status = ast_device_state(cur->interface); ao2_ref(cur, -1); } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); } } AST_LIST_TRAVERSE_SAFE_END; @@ -4722,10 +4738,10 @@ return RESULT_SUCCESS; } AST_LIST_TRAVERSE(&queues, q, list) { - ast_mutex_lock(&q->lock); + ao2_lock(q); if (queue_show) { if (strcasecmp(q->name, argv[2]) != 0) { - ast_mutex_unlock(&q->lock); + ao2_unlock(q); if (!AST_LIST_NEXT(q, list)) { ast_cli(fd, "No such queue: %s.%s",argv[2], term); break; @@ -4811,7 +4827,7 @@ astman_append(s, "%s", term); else ast_cli(fd, "%s", term); - ast_mutex_unlock(&q->lock); + ao2_unlock(q); if (queue_show) break; } @@ -4885,7 +4901,7 @@ snprintf(idText, sizeof(idText), "ActionID: %s\r\n", id); AST_LIST_TRAVERSE(&queues, q, list) { - ast_mutex_lock(&q->lock); + ao2_lock(q); /* List queue properties */ if (ast_strlen_zero(queuefilter) || !strcmp(q->name, queuefilter)) { @@ -4943,7 +4959,7 @@ (long) (now - qe->start), idText); } } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); } astman_append(s, @@ -5210,19 +5226,19 @@ /* here is the case for 3, */ if (!AST_LIST_EMPTY(&queues)) { /* XXX unnecessary ? the traverse does that for us */ AST_LIST_TRAVERSE(&queues, q, list) { - ast_mutex_lock(&q->lock); + ao2_lock(q); mem_iter = ao2_iterator_init(q->members, 0); while ((m = ao2_iterator_next(&mem_iter))) { if (++which > state) { char *tmp; - ast_mutex_unlock(&q->lock); + ao2_unlock(q); tmp = ast_strdup(m->interface); ao2_ref(m, -1); return tmp; } ao2_ref(m, -1); } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); } }