- added support for multiple SIP/UDP receivers.

- some cleanups.


git-svn-id: http://svn.berlios.de/svnroot/repos/sems/trunk@1804 8eb893ce-cfd4-0310-b710-fb5ebe64c474
sayer/1.4-spce2.6
Raphael Coeffic 16 years ago
parent 38283be162
commit d31ef86751

@ -53,6 +53,7 @@ int AmConfig::RtpLowPort = RTP_LOWPORT;
int AmConfig::RtpHighPort = RTP_HIGHPORT;
int AmConfig::SessionProcessorThreads = NUM_SESSION_PROCESSORS;
int AmConfig::MediaProcessorThreads = NUM_MEDIA_PROCESSORS;
int AmConfig::SIPServerThreads = NUM_SIP_SERVERS;
int AmConfig::LocalSIPPort = 5060;
string AmConfig::LocalSIPIP = "";
string AmConfig::OutboundProxy = "";
@ -150,6 +151,14 @@ int AmConfig::setMediaProcessorThreads(const string& th) {
return 1;
}
int AmConfig::setSIPServerThreads(const string& th){
if(sscanf(th.c_str(),"%u",&SIPServerThreads) != 1) {
return 0;
}
return 1;
}
int AmConfig::setDeadRtpTime(const string& drt)
{
if(sscanf(drt.c_str(),"%u",&AmConfig::DeadRtpTime) != 1) {

@ -79,6 +79,8 @@ struct AmConfig
static int SessionProcessorThreads;
/** number of media processor threads */
static int MediaProcessorThreads;
/** number of SIP server threads */
static int SIPServerThreads;
/** the interface SIP requests are sent from - needed for registrar_client */
static string LocalSIPIP;
/** the port SIP requests are sent from - optional (default 5060) */
@ -159,6 +161,8 @@ struct AmConfig
static int setSessionProcessorThreads(const string& th);
/** Setter for parameter MediaProcessorThreads, returns 0 on invalid value */
static int setMediaProcessorThreads(const string& th);
/** Setter for parameter SIPServerThreads, returns 0 on invalid value */
static int setSIPServerThreads(const string& th);
/** Setter for parameter DeadRtpTime, returns 0 on invalid value */
static int setDeadRtpTime(const string& drt);
};

@ -69,13 +69,19 @@ AmSipDialog::~AmSipDialog()
void AmSipDialog::updateStatus(const AmSipRequest& req)
{
if (req.method == "ACK")
if (req.method == "ACK") {
// || (req.method == "CANCEL")
return;
}
if(uas_trans.find(req.cseq) == uas_trans.end()){
DBG("req.tt = {%p,%p}\n",req.tt._bucket, req.tt._t);
uas_trans[req.cseq] = AmSipTransaction(req.method,req.cseq,req.tt);
}
// else {
// // shouldn't we drop those requests?
// // (CANCEL requests should have been handled before)
// }
// target refresh requests
if (req.from_uri.length() &&
@ -334,7 +340,12 @@ int AmSipDialog::reply(const AmSipRequest& req,
if(updateStatusReply(req,code))
return -1;
return SipCtrlInterface::send(reply);
int ret = SipCtrlInterface::send(reply);
if(ret){
ERROR("Could not send reply: code=%i; reason='%s'; method=%s; call-id=%s; cseq=%i\n",
reply.code,reply.reason.c_str(),req.method.c_str(),req.callid.c_str(),req.cseq);
}
return ret;
}
/* static */
@ -353,7 +364,12 @@ int AmSipDialog::reply_error(const AmSipRequest& req, unsigned int code,
if (AmConfig::Signature.length())
reply.hdrs += SIP_HDR_COLSP(SIP_HDR_SERVER) + AmConfig::Signature + CRLF;
return SipCtrlInterface::send(reply);
int ret = SipCtrlInterface::send(reply);
if(ret){
ERROR("Could not send reply: code=%i; reason='%s'; method=%s; call-id=%s; cseq=%i\n",
reply.code,reply.reason.c_str(),req.method.c_str(),req.callid.c_str(),req.cseq);
}
return ret;
}
@ -609,15 +625,15 @@ int AmSipDialog::sendRequest(const string& method,
}
bool AmSipDialog::match_cancel(const AmSipRequest& cancel_req)
{
TransMap::iterator t = uas_trans.find(cancel_req.cseq);
// bool AmSipDialog::match_cancel(const AmSipRequest& cancel_req)
// {
// TransMap::iterator t = uas_trans.find(cancel_req.cseq);
if((t != uas_trans.end()) && (t->second.method == "INVITE"))
return true;
// if((t != uas_trans.end()) && (t->second.method == "INVITE"))
// return true;
return false;
}
// return false;
// }
string AmSipDialog::get_uac_trans_method(unsigned int cseq)
{

@ -193,7 +193,7 @@ class AmSipDialog
* @return true if a transaction could be found that
* matches the CANCEL's one.
*/
bool match_cancel(const AmSipRequest& cancel_req);
//bool match_cancel(const AmSipRequest& cancel_req);
/**
* @return the method of the corresponding uac request

@ -70,9 +70,14 @@ void AmSipDispatcher::handleSipMsg(AmSipRequest &req)
if(!ev_disp->post(local_tag,ev)) {
delete ev;
AmSipDialog::reply_error(req,481,
"Call leg/Transaction does not exist");
if(req.method != "ACK") {
AmSipDialog::reply_error(req,481,
"Call leg/Transaction does not exist");
}
else {
ERROR("received ACK for non-existing dialog (callid=%s;remote_tag=%s;local_tag=%s)\n",
callid.c_str(),remote_tag.c_str(),local_tag.c_str());
}
}
return;
@ -84,7 +89,6 @@ void AmSipDispatcher::handleSipMsg(AmSipRequest &req)
}
DBG("method: `%s' [%zd].\n", req.method.c_str(), req.method.length());
if(req.method == "INVITE"){
AmSessionContainer::instance()->startSessionUAS(req);

@ -288,15 +288,27 @@ void SipCtrlInterface::run(const string& bind_addr, unsigned short bind_port)
{
INFO("Starting SIP control interface\n");
udp_trsp* udp_server = new udp_trsp(trans_layer::instance());
udp_trsp_socket* udp_socket = new udp_trsp_socket;
udp_socket->bind(bind_addr,bind_port);
trans_layer::instance()->register_transport(udp_socket);
udp_trsp** udp_servers = new udp_trsp*[AmConfig::SIPServerThreads];
trans_layer::instance()->register_transport(udp_server);
udp_server->bind(bind_addr,bind_port);
wheeltimer::instance()->start();
udp_server->start();
udp_server->join();
for(int i=0; i<AmConfig::SIPServerThreads;i++){
udp_servers[i] = new udp_trsp(udp_socket);
udp_servers[i]->start();
}
for(int i=0; i<AmConfig::SIPServerThreads;i++){
udp_servers[i]->join();
delete udp_servers[i];
}
delete [] udp_servers;
delete udp_socket;
}
int SipCtrlInterface::send(const AmSipReply &rep)

@ -93,8 +93,12 @@ void set_log_facility(const char* facility) {
}
}
AmMutex dprint_mut;
void dprint(int level, const char* fct, const char* file, int line, const char* fmt, ...)
{
dprint_mut.lock();
va_list ap;
#ifndef _DEBUG
@ -106,6 +110,8 @@ void dprint(int level, const char* fct, const char* file, int line, const char*
vfprintf(stderr,fmt,ap);
fflush(stderr);
va_end(ap);
dprint_mut.unlock();
}
void log_print (int level, const char* fmt, ...)

@ -98,21 +98,23 @@ static void sig_usr_un(int signo)
static AmCondition<bool> need_clean(true);
clean_up_mut.lock();
if(need_clean.get()) {
need_clean.set(false);
clean_up_mut.unlock();
AmSessionContainer::dispose();
AmRtpReceiver::dispose();
//AmServer::dispose();
AmMediaProcessor::dispose();
AmEventDispatcher::dispose();
}
clean_up_mut.unlock();
else {
clean_up_mut.unlock();
}
INFO("Finished.\n");

@ -56,6 +56,8 @@
#define NUM_SESSION_PROCESSORS 10
// threads to start for RTP processing
#define NUM_MEDIA_PROCESSORS 1
// number of SIP servers to start
#define NUM_SIP_SERVERS 4
#define MAX_NET_DEVICES 32

@ -82,7 +82,7 @@ void trans_layer::register_ua(sip_ua* ua)
this->ua = ua;
}
void trans_layer::register_transport(udp_trsp* trsp)
void trans_layer::register_transport(trsp_socket* trsp)
{
transport = trsp;
}
@ -123,7 +123,7 @@ int trans_layer::send_reply(trans_ticket* tt,
bucket->lock();
if(!bucket->exist(t)){
bucket->unlock();
ERROR("Invalid transaction key: transaction does not exist\n");
ERROR("Invalid transaction key: transaction does not exist (%p;%p)\n",bucket,t);
return -1;
}
@ -744,9 +744,9 @@ int trans_layer::send_request(sip_msg* msg, trans_ticket* tt)
compute_branch(branch_buf,msg->callid->value,msg->cseq->value);
cstring branch(branch_buf,BRANCH_BUF_LEN);
string via(transport->get_local_ip());
if(transport->get_local_port() != 5060)
via += ":" + int2str(transport->get_local_port());
string via(transport->get_ip());
if(transport->get_port() != 5060)
via += ":" + int2str(transport->get_port());
// add 'rport' parameter defaultwise? yes, for now
request_len += via_len(stl2cstr(via),branch,true);
@ -877,9 +877,9 @@ int trans_layer::cancel(trans_ticket* tt)
compute_branch(branch_buf,req->callid->value,get_cseq(req)->num_str);
cstring branch(branch_buf,BRANCH_BUF_LEN);
string via(transport->get_local_ip());
if(transport->get_local_port() != 5060)
via += ":" + int2str(transport->get_local_port());
string via(transport->get_ip());
if(transport->get_port() != 5060)
via += ":" + int2str(transport->get_port());
//TODO: add 'rport' parameter by default?

@ -41,7 +41,7 @@ struct sip_header;
struct sockaddr_storage;
class trans_bucket;
class udp_trsp;
class trsp_socket;
class sip_ua;
class timer;
@ -72,8 +72,8 @@ class trans_layer
*/
static trans_layer* _instance;
sip_ua* ua;
udp_trsp* transport;
sip_ua* ua;
trsp_socket* transport;
/** Avoid external instantiation. @see instance(). */
@ -143,7 +143,7 @@ class trans_layer
* Register a transport instance.
* This method MUST be called ONCE.
*/
void register_transport(udp_trsp* trsp);
void register_transport(trsp_socket* trsp);
/**
* Sends a UAS reply.

@ -25,17 +25,76 @@
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "transport.h"
#include "../SipCtrlInterface.h"
#include "../log.h"
#include <assert.h>
#include <netinet/in.h>
transport::transport(trans_layer* tl)
: tl(tl)
trsp_socket::trsp_socket()
: sd(0), ip(), port(0)
{
assert(tl);
memset(&addr,0,sizeof(sockaddr_storage));
}
transport::~transport()
trsp_socket::~trsp_socket()
{
}
const char* trsp_socket::get_ip()
{
return ip.c_str();
}
unsigned short trsp_socket::get_port()
{
return port;
}
void trsp_socket::copy_addr_to(sockaddr_storage* sa)
{
memcpy(sa,&addr,sizeof(sockaddr_storage));
}
int trsp_socket::get_sd()
{
return sd;
}
int trsp_socket::send(const sockaddr_storage* sa, const char* msg, const int msg_len)
{
if ((SipCtrlInterface::log_raw_messages >= 0)
&& (SipCtrlInterface::log_raw_messages <= log_level)) {
_LOG(SipCtrlInterface::log_raw_messages,
"send msg\n--++--\n%.*s--++--\n", msg_len, msg);
}
int err;
#ifdef SUPPORT_IPV6
if (sa->ss_family == AF_INET6) {
err = sendto(sd, msg, msg_len, 0, (const struct sockaddr*)sa, sizeof(sockaddr_in6));
}
else {
#endif
err = sendto(sd, msg, msg_len, 0, (const struct sockaddr*)sa, sizeof(sockaddr_in));
#ifdef SUPPORT_IPV6
}
#endif
if (err < 0) {
ERROR("sendto: %s\n",strerror(errno));
return err;
}
else if (err != msg_len) {
ERROR("sendto: sent %i instead of %i bytes\n", err, msg_len);
return -1;
}
return 0;
}
transport::~transport()
{
}
/** EMACS **

@ -28,46 +28,77 @@
#define _transport_h_
#include "../AmThread.h"
#include <string>
#include <sys/socket.h>
#include <string>
using std::string;
class trans_layer;
struct sockaddr_storage;
#define SAv4(addr) \
((struct sockaddr_in*)addr)
#define SAv6(addr) \
((struct sockaddr_in6*)addr)
class transport: public AmThread
class trsp_socket
{
protected:
/**
* Transaction layer pointer.
* This is used for received messages.
*/
trans_layer* tl;
protected:
// socket descriptor
int sd;
public:
transport(trans_layer* tl);
// bound address
sockaddr_storage addr;
// bound IP
string ip;
// bound port number
unsigned short port;
public:
trsp_socket();
virtual ~trsp_socket();
virtual ~transport();
/**
* Binds the transport server to an address
* Binds the transport socket to an address
* @return -1 if error(s) occured.
*/
virtual int bind(const string& address, unsigned short port)=0;
/**
* Getter for IP address
*/
const char* get_ip();
/**
* Getter for the port number
*/
unsigned short get_port();
/**
* Getter for the socket descriptor
*/
int get_sd();
/**
* Copy the internal address into the given one (sa).
*/
void copy_addr_to(sockaddr_storage* sa);
/**
* Sends a message.
* @return -1 if error(s) occured.
*/
virtual int send(const sockaddr_storage* sa, const char* msg, const int msg_len)=0;
virtual int send(const sockaddr_storage* sa, const char* msg, const int msg_len);
};
virtual const char* get_local_ip()=0;
virtual unsigned short get_local_port()=0;
virtual void copy_local_addr(sockaddr_storage* sa)=0;
class transport: public AmThread
{
protected:
trsp_socket* sock;
public:
transport(trsp_socket* sock): sock(sock) {}
virtual ~transport();
};
#endif

@ -58,33 +58,108 @@
// u_char data[DSTADDR_DATASIZE];
// };
udp_trsp::udp_trsp(trans_layer* tl)
: transport(tl), sd(0)
/** @see trsp_socket */
int udp_trsp_socket::bind(const string& bind_ip, unsigned short bind_port)
{
tl->register_transport(this);
}
if(sd){
WARN("re-binding socket\n");
close(sd);
}
memset(&addr,0,sizeof(addr));
udp_trsp::~udp_trsp()
{
}
addr.ss_family = AF_INET;
#if defined(BSD44SOCKETS)
addr.ss_len = sizeof(struct sockaddr_in);
#endif
SAv4(&addr)->sin_port = htons(bind_port);
if(inet_aton(bind_ip.c_str(),&SAv4(&addr)->sin_addr)<0){
ERROR("inet_aton: %s\n",strerror(errno));
return -1;
}
if(SAv4(&addr)->sin_addr.s_addr == INADDR_ANY){
ERROR("Sorry, we cannot bind 'ANY' address\n");
return -1;
}
const char* udp_trsp::get_local_ip()
{
return local_ip.c_str();
if((sd = socket(PF_INET,SOCK_DGRAM,0)) == -1){
ERROR("socket: %s\n",strerror(errno));
return -1;
}
if(::bind(sd,(const struct sockaddr*)&addr,
sizeof(struct sockaddr_in))) {
ERROR("bind: %s\n",strerror(errno));
close(sd);
return -1;
}
int true_opt = 1;
if(setsockopt(sd, SOL_SOCKET, SO_REUSEADDR,
(void*)&true_opt, sizeof (true_opt)) == -1) {
ERROR("%s\n",strerror(errno));
close(sd);
return -1;
}
if(setsockopt(sd, IPPROTO_IP, DSTADDR_SOCKOPT,
(void*)&true_opt, sizeof (true_opt)) == -1) {
ERROR("%s\n",strerror(errno));
close(sd);
return -1;
}
if (SipCtrlInterface::udp_rcvbuf > 0) {
DBG("trying to set SIP UDP socket buffer to %d\n",
SipCtrlInterface::udp_rcvbuf);
if(setsockopt(sd, SOL_SOCKET, SO_RCVBUF,
(void*)&SipCtrlInterface::udp_rcvbuf,
sizeof (SipCtrlInterface::udp_rcvbuf)) == -1) {
WARN("could not set SIP UDP socket buffer: '%s'\n",
strerror(errno));
} else {
socklen_t optlen;
int set_rcvbuf_size=0;
if (getsockopt(sd, SOL_SOCKET, SO_RCVBUF,
&set_rcvbuf_size, &optlen) == -1) {
WARN("could not read back SIP UDP socket buffer length: '%s'\n",
strerror(errno));
} else {
if (set_rcvbuf_size != SipCtrlInterface::udp_rcvbuf) {
WARN("failed to set SIP UDP RCVBUF size (wanted %d, got %d)\n",
SipCtrlInterface::udp_rcvbuf, set_rcvbuf_size);
}
}
}
}
port = bind_port;
ip = bind_ip;
DBG("UDP transport bound to %s:%i\n",ip.c_str(),port);
return 0;
}
unsigned short udp_trsp::get_local_port()
/** @see trsp_socket */
udp_trsp::udp_trsp(udp_trsp_socket* sock)
: transport(sock)
{
return local_port;
}
void udp_trsp::copy_local_addr(sockaddr_storage* sa)
udp_trsp::~udp_trsp()
{
memcpy(sa,&local_addr,sizeof(sockaddr_storage));
}
/** @see AmThread */
void udp_trsp::run()
{
@ -107,16 +182,18 @@ void udp_trsp::run()
msg.msg_control = new u_char[DSTADDR_DATASIZE];
msg.msg_controllen = DSTADDR_DATASIZE;
if(sd<=0){
if(sock->get_sd()<=0){
ERROR("Transport instance not bound\n");
return;
}
DBG("Started UDP server listening to %s:%i\n",sock->get_ip(),sock->get_port());
while(true){
DBG("before recvmsg (%s:%i)\n",local_ip.c_str(),local_port);
//DBG("before recvmsg (%s:%i)\n",sock->get_ip(),sock->get_port());
buf_len = recvmsg(sd,&msg,0);
buf_len = recvmsg(sock->get_sd(),&msg,0);
if(buf_len <= 0){
ERROR("recvfrom returned %d: %s\n",buf_len,strerror(errno));
switch(errno){
@ -148,13 +225,13 @@ void udp_trsp::run()
cmsgptr->cmsg_type == DSTADDR_SOCKOPT) {
s_msg->local_ip.ss_family = AF_INET;
((sockaddr_in*)(&s_msg->local_ip))->sin_port = htons(local_port);
((sockaddr_in*)(&s_msg->local_ip))->sin_port = htons(sock->get_port());
memcpy(&((sockaddr_in*)(&s_msg->local_ip))->sin_addr,dstaddr(cmsgptr),sizeof(in_addr));
}
}
// pass message to the parser / transaction layer
tl->received_msg(s_msg);
trans_layer::instance()->received_msg(s_msg);
}
}
@ -165,127 +242,6 @@ void udp_trsp::on_stop()
}
/** @see transport */
int udp_trsp::bind(const string& address, unsigned short port)
{
if(sd){
WARN("re-binding socket\n");
close(sd);
}
memset(&local_addr,0,sizeof(local_addr));
local_addr.ss_family = AF_INET;
#if defined(BSD44SOCKETS)
local_addr.ss_len = sizeof(struct sockaddr_in);
#endif
SAv4(&local_addr)->sin_port = htons(port);
if(inet_aton(address.c_str(),&SAv4(&local_addr)->sin_addr)<0){
ERROR("inet_aton: %s\n",strerror(errno));
return -1;
}
if(SAv4(&local_addr)->sin_addr.s_addr == INADDR_ANY){
ERROR("Sorry, we cannot bind 'ANY' address\n");
return -1;
}
if((sd = socket(PF_INET,SOCK_DGRAM,0)) == -1){
ERROR("socket: %s\n",strerror(errno));
return -1;
}
if(::bind(sd,(const struct sockaddr*)&local_addr,
sizeof(struct sockaddr_in))) {
ERROR("bind: %s\n",strerror(errno));
close(sd);
return -1;
}
int true_opt = 1;
if(setsockopt(sd, SOL_SOCKET, SO_REUSEADDR,
(void*)&true_opt, sizeof (true_opt)) == -1) {
ERROR("%s\n",strerror(errno));
close(sd);
return -1;
}
if(setsockopt(sd, IPPROTO_IP, DSTADDR_SOCKOPT,
(void*)&true_opt, sizeof (true_opt)) == -1) {
ERROR("%s\n",strerror(errno));
close(sd);
return -1;
}
if (SipCtrlInterface::udp_rcvbuf > 0) {
DBG("trying to set SIP UDP socket buffer to %d\n",
SipCtrlInterface::udp_rcvbuf);
if(setsockopt(sd, SOL_SOCKET, SO_RCVBUF,
(void*)&SipCtrlInterface::udp_rcvbuf,
sizeof (SipCtrlInterface::udp_rcvbuf)) == -1) {
WARN("could not set SIP UDP socket buffer: '%s'\n",
strerror(errno));
} else {
socklen_t optlen;
int set_rcvbuf_size=0;
if (getsockopt(sd, SOL_SOCKET, SO_RCVBUF,
&set_rcvbuf_size, &optlen) == -1) {
WARN("could not read back SIP UDP socket buffer length: '%s'\n",
strerror(errno));
} else {
if (set_rcvbuf_size != SipCtrlInterface::udp_rcvbuf) {
WARN("failed to set SIP UDP RCVBUF size (wanted %d, got %d)\n",
SipCtrlInterface::udp_rcvbuf, set_rcvbuf_size);
}
}
}
}
local_port = port;
local_ip = address;
DBG("UDP transport bound to %s:%i\n",address.c_str(),port);
return 0;
}
/** @see transport */
int udp_trsp::send(const sockaddr_storage* sa, const char* msg, const int msg_len)
{
if ((SipCtrlInterface::log_raw_messages >= 0)
&& (SipCtrlInterface::log_raw_messages <=log_level)) {
_LOG(SipCtrlInterface::log_raw_messages,
"send msg\n--++--\n%.*s--++--\n", msg_len, msg);
}
int err;
#ifdef SUPPORT_IPV6
if (sa->ss_family == AF_INET6) {
err = sendto(sd, msg, msg_len, 0, (const struct sockaddr*)sa, sizeof(sockaddr_in6));
}
else {
#endif
err = sendto(sd, msg, msg_len, 0, (const struct sockaddr*)sa, sizeof(sockaddr_in));
#ifdef SUPPORT_IPV6
}
#endif
if (err < 0) {
ERROR("sendto: %s\n",strerror(errno));
return err;
}
else if (err != msg_len) {
ERROR("sendto: sent %i instead of %i bytes\n", err, msg_len);
return -1;
}
return 0;
}
/** EMACS **
* Local variables:

@ -40,40 +40,31 @@
#include <string>
using std::string;
class udp_trsp: public transport
class udp_trsp_socket: public trsp_socket
{
// socket descriptor
int sd;
// bound port number
unsigned short local_port;
// bound IP
string local_ip;
public:
udp_trsp_socket() : trsp_socket() {}
~udp_trsp_socket() {}
// bound address
sockaddr_storage local_addr;
/**
* Binds the transport socket to an address
* @return -1 if error(s) occured.
*/
virtual int bind(const string& address, unsigned short port);
};
protected:
class udp_trsp: public transport
{
protected:
/** @see AmThread */
void run();
/** @see AmThread */
void on_stop();
public:
public:
/** @see transport */
udp_trsp(trans_layer* tl);
udp_trsp(udp_trsp_socket* sock);
~udp_trsp();
/** @see transport */
int bind(const string& address, unsigned short port);
/** @see transport */
int send(const sockaddr_storage* sa, const char* msg, const int msg_len);
const char* get_local_ip();
unsigned short get_local_port();
void copy_local_addr(sockaddr_storage* sa);
};
#endif

Loading…
Cancel
Save