MT#62181 use shared_ptr for refcounting sockets

Use shared_ptr for automatic reference counting of sockets, instead of
doing it manually through atomic_ref_cnt.

Use shared_ptr instead of raw pointers in all appropriate places.

Add const qualifier for a few methods.

No functional change.

Change-Id: I7db07fd90a2398f0253290aa2d810f8599e3983e
mr13.5
Richard Fuchs 11 months ago
parent 21f38c5627
commit 2205da584d

@ -58,13 +58,15 @@
#include "AmEventDispatcher.h"
#include "AmSipEvent.h"
using std::make_shared;
bool SipCtrlInterface::log_parsed_messages = true;
int SipCtrlInterface::udp_rcvbuf = -1;
int SipCtrlInterface::init_udp_servers(int if_num)
{
udp_trsp_socket* udp_socket =
new udp_trsp_socket(if_num,AmConfig::SIP_Ifs[if_num].SigSockOpts
auto udp_socket =
make_shared<udp_trsp_socket>(if_num,AmConfig::SIP_Ifs[if_num].SigSockOpts
| (AmConfig::ForceOutboundIf ?
trsp_socket::force_outbound_if : 0)
| (AmConfig::UseRawSockets ?
@ -82,7 +84,6 @@ int SipCtrlInterface::init_udp_servers(int if_num)
AmConfig::SIP_Ifs[if_num].LocalIP.c_str(),
AmConfig::SIP_Ifs[if_num].LocalPort);
delete udp_socket;
return -1;
}
@ -92,7 +93,6 @@ int SipCtrlInterface::init_udp_servers(int if_num)
trans_layer::instance()->register_transport(udp_socket);
udp_sockets.push_back(udp_socket);
inc_ref(udp_socket);
for(int j=0; j<AmConfig::SIPServerThreads;j++){
udp_servers.emplace_back(udp_socket);
@ -103,7 +103,7 @@ int SipCtrlInterface::init_udp_servers(int if_num)
int SipCtrlInterface::init_tcp_servers(int if_num)
{
tcp_server_socket* tcp_socket = new tcp_server_socket(if_num);
auto tcp_socket = make_shared<tcp_server_socket>(if_num);
if(!AmConfig::SIP_Ifs[if_num].PublicIP.empty()) {
tcp_socket->set_public_ip(AmConfig::SIP_Ifs[if_num].PublicIP);
@ -119,7 +119,6 @@ int SipCtrlInterface::init_tcp_servers(int if_num)
AmConfig::SIP_Ifs[if_num].LocalIP.c_str(),
AmConfig::SIP_Ifs[if_num].LocalPort);
delete tcp_socket;
return -1;
}
@ -128,7 +127,6 @@ int SipCtrlInterface::init_tcp_servers(int if_num)
trans_layer::instance()->register_transport(tcp_socket);
tcp_sockets.push_back(tcp_socket);
inc_ref(tcp_socket);
tcp_servers.emplace_back(tcp_socket);

@ -35,9 +35,11 @@
#include <string>
#include <list>
#include <deque>
#include <memory>
using std::string;
using std::list;
using std::deque;
using std::shared_ptr;
class AmSipRequest;
class AmSipReply;
@ -69,10 +71,10 @@ class SipCtrlInterface:
AmCondition stopped;
deque<udp_trsp_socket*> udp_sockets;
deque<udp_trsp> udp_servers;
deque<tcp_server_socket*> tcp_sockets;
deque<tcp_trsp> tcp_servers;
deque<shared_ptr<udp_trsp_socket>> udp_sockets;
deque<udp_trsp> udp_servers;
deque<shared_ptr<tcp_server_socket>> tcp_sockets;
deque<tcp_trsp> tcp_servers;
int init_udp_servers(int if_num);
int init_tcp_servers(int if_num);

@ -115,9 +115,6 @@ sip_msg::~sip_msg()
delete u.reply;
}
}
if(local_socket)
dec_ref(local_socket);
}
void sip_msg::copy_msg_buf(const char* msg_buf, int msg_len)

@ -36,6 +36,8 @@
#include <list>
using std::list;
#include <memory>
using std::shared_ptr;
#include <netinet/in.h>
#include <sys/socket.h>
@ -133,7 +135,7 @@ struct sip_msg
cstring body;
sockaddr_storage local_ip;
trsp_socket* local_socket;
shared_ptr<trsp_socket> local_socket;
sockaddr_storage remote_ip;

@ -90,9 +90,6 @@ sip_trans::~sip_trans()
delete msg;
delete targets;
delete [] retr_buf;
if(retr_socket){
dec_ref(retr_socket);
}
if((type == TT_UAC) && to_tag.s){
delete [] to_tag.s;
}

@ -37,6 +37,8 @@
#include <list>
using std::list;
#include <memory>
using std::shared_ptr;
struct sip_msg;
struct sip_target_set;
@ -148,7 +150,7 @@ class sip_trans
/** Destination for retransmissions */
sockaddr_storage retr_addr;
trsp_socket* retr_socket;
shared_ptr<trsp_socket> retr_socket;
/** flags used by send_request() */
unsigned int flags;

@ -65,23 +65,21 @@ void tcp_trsp_socket::create_connected(tcp_server_socket* server_sock,
if(sd == -1)
return;
tcp_trsp_socket* sock = new tcp_trsp_socket(server_sock, server_worker,
sd,sa,evbase);
shared_ptr<tcp_trsp_socket> sock(new tcp_trsp_socket(server_sock, server_worker,
sd, sa, evbase));
inc_ref(sock);
server_worker.add_connection(sock);
sock->connected = true;
sock->add_read_event();
dec_ref(sock);
}
tcp_trsp_socket* tcp_trsp_socket::new_connection(tcp_server_socket* server_sock,
shared_ptr<tcp_trsp_socket> tcp_trsp_socket::new_connection(tcp_server_socket* server_sock,
tcp_server_worker& server_worker,
const sockaddr_storage* sa,
struct event_base* evbase)
{
return new tcp_trsp_socket(server_sock, server_worker, -1, sa, evbase);
return shared_ptr<tcp_trsp_socket>(new tcp_trsp_socket(server_sock, server_worker, -1, sa, evbase));
}
@ -260,7 +258,6 @@ int tcp_trsp_socket::send(const sockaddr_storage* sa, const char* msg,
void tcp_trsp_socket::close()
{
inc_ref(this);
server_worker.remove_connection(this);
closed = true;
@ -275,7 +272,6 @@ void tcp_trsp_socket::close()
}
generate_transport_errors();
dec_ref(this);
}
void tcp_trsp_socket::generate_transport_errors()
@ -401,8 +397,7 @@ int tcp_trsp_socket::parse_input()
copy_peer_addr(&s_msg->remote_ip);
copy_addr_to(&s_msg->local_ip);
s_msg->local_socket = this;
inc_ref(this);
s_msg->local_socket = shared_from_this();
// pass message to the parser / transaction layer
trans_layer::instance()->received_msg(s_msg);
@ -487,7 +482,7 @@ tcp_server_worker::~tcp_server_worker()
event_base_free(evbase);
}
void tcp_server_worker::add_connection(tcp_trsp_socket* client_sock)
void tcp_server_worker::add_connection(const shared_ptr<tcp_trsp_socket>& client_sock)
{
string conn_id = client_sock->get_peer_ip()
+ ":" + int2str(client_sock->get_peer_port());
@ -497,29 +492,26 @@ void tcp_server_worker::add_connection(tcp_trsp_socket* client_sock)
client_sock->get_peer_port());
connections_mut.lock();
map<string,tcp_trsp_socket*>::iterator sock_it = connections.find(conn_id);
auto sock_it = connections.find(conn_id);
if(sock_it != connections.end()) {
dec_ref(sock_it->second);
sock_it->second = client_sock;
}
else {
connections[conn_id] = client_sock;
}
inc_ref(client_sock);
connections_mut.unlock();
}
void tcp_server_worker::remove_connection(tcp_trsp_socket* client_sock)
void tcp_server_worker::remove_connection(const tcp_trsp_socket* client_sock)
{
string conn_id = client_sock->get_peer_ip()
const string& conn_id = client_sock->get_peer_ip()
+ ":" + int2str(client_sock->get_peer_port());
DBG("removing TCP connection from %s",conn_id.c_str());
connections_mut.lock();
map<string,tcp_trsp_socket*>::iterator sock_it = connections.find(conn_id);
auto sock_it = connections.find(conn_id);
if(sock_it != connections.end()) {
dec_ref(sock_it->second);
connections.erase(sock_it);
DBG("TCP connection from %s removed",conn_id.c_str());
}
@ -533,24 +525,20 @@ int tcp_server_worker::send(const sockaddr_storage* sa, const char* msg,
string dest = am_inet_ntop(sa,host_buf,NI_MAXHOST);
dest += ":" + int2str(am_get_port(sa));
tcp_trsp_socket* sock = NULL;
shared_ptr<tcp_trsp_socket> sock;
bool new_conn=false;
connections_mut.lock();
map<string,tcp_trsp_socket*>::iterator sock_it = connections.find(dest);
auto sock_it = connections.find(dest);
if(sock_it != connections.end()) {
sock = sock_it->second;
inc_ref(sock);
}
else {
//TODO: add flags to avoid new connections (ex: UAs behind NAT)
tcp_trsp_socket* new_sock = tcp_trsp_socket::new_connection(server_sock, *this,
sa,evbase);
auto new_sock = tcp_trsp_socket::new_connection(server_sock, *this, sa, evbase);
connections[dest] = new_sock;
inc_ref(new_sock);
sock = new_sock;
inc_ref(sock);
new_conn = true;
}
connections_mut.unlock();
@ -559,9 +547,8 @@ int tcp_server_worker::send(const sockaddr_storage* sa, const char* msg,
// to avoid dead-lock with the event base
int ret = sock->send(sa,msg,msg_len,flags);
if((ret < 0) && new_conn) {
remove_connection(sock);
remove_connection(sock.get());
}
dec_ref(sock);
return ret;
}
@ -777,7 +764,7 @@ struct timeval* tcp_server_socket::get_idle_timeout()
return NULL;
}
tcp_trsp::tcp_trsp(tcp_server_socket* sock)
tcp_trsp::tcp_trsp(const shared_ptr<tcp_server_socket>& sock)
: transport(sock, true)
{
evbase = event_base_new();
@ -800,7 +787,7 @@ void tcp_trsp::run()
return;
}
tcp_server_socket* tcp_sock = static_cast<tcp_server_socket*>(sock);
auto tcp_sock = std::static_pointer_cast<tcp_server_socket>(sock);
tcp_sock->start_threads(*_sd_notifier);
INFO("Started SIP server TCP transport on %s:%i\n",
@ -819,7 +806,7 @@ void tcp_trsp::run()
void tcp_trsp::on_stop()
{
event_base_loopbreak(evbase);
tcp_server_socket* tcp_sock = static_cast<tcp_server_socket*>(sock);
auto tcp_sock = std::static_pointer_cast<tcp_server_socket>(sock);
tcp_sock->stop_threads();
}

@ -19,14 +19,18 @@ using std::deque;
#include <map>
#include <deque>
#include <string>
#include <memory>
using std::map;
using std::deque;
using std::string;
using std::shared_ptr;
class tcp_server_worker;
class tcp_server_socket;
class tcp_trsp_socket: public trsp_socket
class tcp_trsp_socket
: public trsp_socket,
public std::enable_shared_from_this<tcp_trsp_socket>
{
tcp_server_socket* server_sock;
tcp_server_worker& server_worker;
@ -153,7 +157,7 @@ public:
int sd, const sockaddr_storage* sa,
struct event_base* evbase);
static tcp_trsp_socket* new_connection(tcp_server_socket* server_sock,
static shared_ptr<tcp_trsp_socket> new_connection(tcp_server_socket* server_sock,
tcp_server_worker& server_worker,
const sockaddr_storage* sa,
struct event_base* evbase);
@ -164,15 +168,15 @@ public:
void copy_peer_addr(sockaddr_storage* sa);
const string& get_peer_ip() {
const string& get_peer_ip() const {
return peer_ip;
}
unsigned short get_peer_port() {
unsigned short get_peer_port() const {
return peer_port;
}
bool is_connected() {
bool is_connected() const {
return connected;
}
@ -191,7 +195,7 @@ class tcp_server_worker
tcp_server_socket* server_sock;
AmMutex connections_mut;
map<string,tcp_trsp_socket*> connections;
map<string, shared_ptr<tcp_trsp_socket>> connections;
protected:
void run();
@ -204,8 +208,8 @@ public:
int send(const sockaddr_storage* sa, const char* msg,
const int msg_len, unsigned int flags);
void add_connection(tcp_trsp_socket* client_sock);
void remove_connection(tcp_trsp_socket* client_sock);
void add_connection(const shared_ptr<tcp_trsp_socket>& client_sock);
void remove_connection(const tcp_trsp_socket* client_sock);
};
class tcp_server_socket: public trsp_socket
@ -280,7 +284,7 @@ protected:
public:
/** @see transport */
tcp_trsp(tcp_server_socket* sock);
tcp_trsp(const shared_ptr<tcp_server_socket>& sock);
~tcp_trsp();
};

@ -90,7 +90,7 @@ void trans_layer::register_ua(sip_ua* ua)
this->ua = ua;
}
int trans_layer::register_transport(trsp_socket* trsp)
int trans_layer::register_transport(const shared_ptr<trsp_socket>& trsp)
{
int if_num = trsp->get_if();
if(transports.size() <= (size_t)if_num)
@ -147,9 +147,7 @@ int trans_layer::set_trsp_socket(sip_msg* msg, const cstring& next_trsp,
}
}
if(msg->local_socket) dec_ref(msg->local_socket);
msg->local_socket = prot_sock_it->second;
inc_ref(msg->local_socket);
return 0;
}
@ -267,9 +265,10 @@ int trans_layer::send_reply(sip_msg* msg, const trans_ticket* tt,
sip_msg* req = t->msg;
assert(req);
// patch Contact-HF
auto local_socket = req->local_socket;
// patch Contact-HF
vector<string> contact_buf;
trsp_socket* local_socket = req->local_socket;
if(!local_socket->is_opt_set(trsp_socket::no_transport_in_contact)) {
cstring trsp(local_socket->get_transport());
@ -279,7 +278,7 @@ int trans_layer::send_reply(sip_msg* msg, const trans_ticket* tt,
for(list<sip_header*>::iterator contact_it = msg->contacts.begin();
contact_it != msg->contacts.end(); contact_it++, contact_buf_it++) {
patch_contact_transport(*contact_it,trsp,*contact_buf_it);
patch_contact_transport(*contact_it, trsp, *contact_buf_it);
}
}
@ -620,8 +619,6 @@ int trans_layer::send_reply(sip_msg* msg, const trans_ticket* tt,
t->retr_buf = reply_buf;
t->retr_len = reply_len;
memcpy(&t->retr_addr,&remote_ip,sizeof(sockaddr_storage));
inc_ref(local_socket);
if(t->retr_socket) dec_ref(t->retr_socket);
t->retr_socket = local_socket;
if(logger) {
@ -1197,7 +1194,6 @@ static int generate_and_parse_new_msg(sip_msg* msg, sip_msg*& p_msg)
// copy msg->remote_ip
memcpy(&p_msg->remote_ip,&msg->remote_ip,sizeof(sockaddr_storage));
p_msg->local_socket = msg->local_socket;
inc_ref(p_msg->local_socket);
return 0;
}
@ -1531,7 +1527,6 @@ int trans_layer::cancel(trans_ticket* tt, const cstring& dialog_id,
memcpy(&p_msg->remote_ip,&req->remote_ip,sizeof(sockaddr_storage));
p_msg->local_socket = req->local_socket;
inc_ref(p_msg->local_socket);
DBG("Sending to %s:%i:\n<%.*s>\n",
get_addr_str(&p_msg->remote_ip).c_str(),
@ -2095,8 +2090,6 @@ int trans_layer::update_uac_request(trans_bucket* bucket, sip_trans*& t,
// copy destination address
memcpy(&t->retr_addr,&msg->remote_ip,sizeof(sockaddr_storage));
inc_ref(msg->local_socket);
if(t->retr_socket) dec_ref(t->retr_socket);
t->retr_socket = msg->local_socket;
// remove the message;
@ -2717,7 +2710,7 @@ int trans_layer::try_next_ip(trans_bucket* bucket, sip_trans* tr,
// copy the new address back
memcpy(&tr->msg->remote_ip,&sa,sizeof(sockaddr_storage));
trsp_socket* old_sock = tr->msg->local_socket;
auto old_sock = tr->msg->local_socket;
int out_interface = old_sock->get_if();
if(set_trsp_socket(tr->msg,next_trsp,out_interface) < 0)
return -1;

@ -47,6 +47,9 @@ using std::string;
#include <map>
using std::map;
#include <memory>
using std::shared_ptr;
struct sip_msg;
struct sip_uri;
class sip_trans;
@ -125,7 +128,7 @@ private:
sip_ua* ua;
struct less_case_i { bool operator ()(const string& lhs, const string& rhs) const; };
typedef map<string,trsp_socket*,less_case_i> prot_collection;
typedef map<string, shared_ptr<trsp_socket>, less_case_i> prot_collection;
vector<prot_collection> transports;
@ -153,7 +156,7 @@ public:
* Register a transport instance.
* This method MUST be called at least once.
*/
int register_transport(trsp_socket* trsp);
int register_transport(const shared_ptr<trsp_socket>& trsp);
/**
* Clears all registered transport instances.

@ -36,11 +36,13 @@
#include <string>
using std::string;
#include <memory>
using std::shared_ptr;
#define DEFAULT_TCP_CONNECT_TIMEOUT 2000 /* 2 seconds */
#define DEFAULT_TCP_IDLE_TIMEOUT 3600000 /* 1 hour */
class trsp_socket
: public atomic_ref_cnt
{
public:
enum socket_options {
@ -156,11 +158,11 @@ public:
class transport: public AmThread
{
protected:
trsp_socket* sock;
shared_ptr<trsp_socket> sock;
public:
transport(trsp_socket* sock, bool triggers_ready = false):
AmThread(triggers_ready), sock(sock)
transport(const shared_ptr<trsp_socket>& sock, bool triggers_ready = false):
AmThread(triggers_ready), sock(sock)
{}
virtual ~transport();
};

@ -247,7 +247,7 @@ int udp_trsp_socket::send(const sockaddr_storage* sa,
/** @see trsp_socket */
udp_trsp::udp_trsp(udp_trsp_socket* sock)
udp_trsp::udp_trsp(const shared_ptr<udp_trsp_socket>& sock)
: transport(sock, true)
{
iov[0].iov_base = buf;
@ -324,7 +324,6 @@ void udp_trsp::run()
}
s_msg->local_socket = sock;
inc_ref(sock);
for (cmsghdr* cmsgptr = CMSG_FIRSTHDR(&msg);
cmsgptr != NULL;

@ -112,7 +112,7 @@ protected:
public:
/** @see transport */
udp_trsp(udp_trsp_socket* sock);
udp_trsp(const shared_ptr<udp_trsp_socket>& sock);
~udp_trsp();
};

Loading…
Cancel
Save