MT#62181 add systemd support

Change-Id: I67a22e96dbf97e79a63278481c005a8df8c7dcf7
master
Richard Fuchs 3 weeks ago
parent 200ba57bbd
commit 12317f7116

@ -27,6 +27,7 @@
#include "AmThread.h"
#include "log.h"
#include "SdNotify.h"
#include <unistd.h>
#include "errno.h"
@ -46,6 +47,10 @@ void AmThread::_start()
{
_pid = static_cast<unsigned long>(_td.native_handle());
DBG("Thread %lu is starting.\n", _pid);
if (!_triggers_ready)
ready();
run();
DBG("Thread %lu is ending.\n", _pid);
@ -66,6 +71,20 @@ void AmThread::start()
_td = std::thread(&AmThread::_start, this);
}
void AmThread::start(SdNotifier& sd)
{
_sd_notifier = sd;
sd.waiter();
start();
}
void AmThread::ready()
{
if (!_sd_notifier)
return;
_sd_notifier->get().running();
}
void AmThread::stop()
{
state expected = state::running;

@ -37,10 +37,13 @@
#include <atomic>
#include <condition_variable>
#include <thread>
#include <optional>
using std::lock_guard;
using std::atomic_bool;
class SdNotifier;
/**
* \brief Wrapper class for std::mutex
*/
@ -127,6 +130,9 @@ class AmThread
void _start();
/** Initialise to true in subclasses that call the ready() method */
bool _triggers_ready;
protected:
virtual void run()=0;
virtual void on_stop() {};
@ -134,18 +140,25 @@ protected:
/** @return true if this thread ought to stop. */
bool stop_requested() { return _state == stopping; }
std::optional<std::reference_wrapper<SdNotifier>> _sd_notifier;
/** Signal that the thread is ready to do its job */
void ready();
public:
unsigned long _pid;
AmThread()
AmThread(bool triggers_ready = false)
: _state(state::idle),
_joined(false)
_joined(false),
_triggers_ready(triggers_ready)
{}
virtual ~AmThread();
/** Start it ! */
void start();
void start(SdNotifier&);
/** Stop it ! */
void stop();

@ -50,7 +50,7 @@ core: $(OBJS) ../Makefile.defs
include ../Makefile.defs
CPPFLAGS += -I$(COREPATH) -fno-strict-aliasing
LDFLAGS += -levent -levent_pthreads
LDFLAGS += -levent -levent_pthreads -lsystemd
ifdef USE_LIBSAMPLERATE
CPPFLAGS += -DUSE_LIBSAMPLERATE

@ -0,0 +1,41 @@
#include "SdNotify.h"
#include <systemd/sd-daemon.h>
void
SdNotifier::ready()
{
std::unique_lock<std::mutex> _l(_lock);
while (waiters)
_cond.wait(_l);
sd_notify(0, "READY=1");
}
void
SdNotifier::stopping()
{
sd_notify(0, "STOPPING=1");
}
void
SdNotifier::status(const std::string &s)
{
sd_notifyf(0, "STATUS=%s\n", s.c_str());
}
void
SdNotifier::waiter()
{
std::lock_guard<std::mutex> _l(_lock);
waiters++;
}
void
SdNotifier::running()
{
std::lock_guard<std::mutex> _l(_lock);
waiters--;
if (!waiters)
_cond.notify_one();
}

@ -0,0 +1,25 @@
#ifndef _SdNotify_h_
#define _SdNotify_h_
#include <string>
#include <mutex>
#include <condition_variable>
class SdNotifier
{
public:
void ready();
void stopping();
void status(const std::string &s);
void waiter(); // increases waiter count by one
void running(); // decreases waiter count by one
private:
std::mutex _lock;
std::condition_variable _cond;
unsigned int waiters = 0;
};
#endif // _SdNotify_h_

@ -371,27 +371,23 @@ int _SipCtrlInterface::send(AmSipRequest &req, const string& dialog_id,
return res;
}
int _SipCtrlInterface::run()
int _SipCtrlInterface::run(SdNotifier& sd)
{
DBG("Starting SIP control interface\n");
wheeltimer::instance()->start();
if (NULL != udp_servers) {
for(int i=0; i<nr_udp_servers;i++){
udp_servers[i]->start();
udp_servers[i]->start(sd);
}
}
if (NULL != tcp_servers) {
for(int i=0; i<nr_tcp_servers;i++){
tcp_servers[i]->start();
tcp_servers[i]->start(sd);
}
}
while (!stopped.get()) {
stopped.wait_for();
}
DBG("SIP control interface ending\n");
return 0;
}

@ -94,7 +94,7 @@ public:
int load();
int run();
int run(SdNotifier&);
void stop();
void cleanup();

@ -35,6 +35,7 @@
#include "AmEventDispatcher.h"
#include "AmSessionProcessor.h"
#include "AmAppTimer.h"
#include "SdNotify.h"
#ifdef WITH_ZRTP
# include "AmZRTP.h"
@ -348,6 +349,8 @@ int main(int argc, char* argv[])
int fd[2] = {0,0};
#endif
SdNotifier sd;
progname = strrchr(argv[0], '/');
progname = (progname == NULL ? argv[0] : progname + 1);
@ -646,9 +649,14 @@ int main(int argc, char* argv[])
INFO("SEMS " SEMS_VERSION " (" ARCH "/" OS") started");
// running the server
if(sip_ctrl.run() != -1)
if(sip_ctrl.run(sd) != -1)
success = true;
sd.ready();
while (!is_shutting_down.get())
is_shutting_down.wait_for();
// session container stops active sessions
INFO("Disposing session container\n");
AmSessionContainer::dispose();
@ -667,6 +675,9 @@ int main(int argc, char* argv[])
AmEventDispatcher::dispose();
error:
sd.stopping();
INFO("Disposing plug-ins\n");
AmPlugIn::dispose();

@ -698,10 +698,10 @@ void tcp_server_socket::add_threads(unsigned int n)
}
}
void tcp_server_socket::start_threads()
void tcp_server_socket::start_threads(SdNotifier& sd)
{
for(unsigned int i=0; i<workers.size(); i++) {
workers[i]->start();
workers[i]->start(sd);
}
}
@ -777,7 +777,7 @@ struct timeval* tcp_server_socket::get_idle_timeout()
}
tcp_trsp::tcp_trsp(tcp_server_socket* sock)
: transport(sock)
: transport(sock, true)
{
evbase = event_base_new();
sock->add_event(evbase);
@ -800,11 +800,13 @@ void tcp_trsp::run()
}
tcp_server_socket* tcp_sock = static_cast<tcp_server_socket*>(sock);
tcp_sock->start_threads();
tcp_sock->start_threads(*_sd_notifier);
INFO("Started SIP server TCP transport on %s:%i\n",
sock->get_ip(),sock->get_port());
ready();
/* Start the event loop. */
int ret = event_base_dispatch(evbase);

@ -238,7 +238,7 @@ public:
~tcp_server_socket() {}
void add_threads(unsigned int n);
void start_threads();
void start_threads(SdNotifier&);
void stop_threads();
const char* get_transport() const { return "tcp"; }

@ -159,7 +159,9 @@ protected:
trsp_socket* sock;
public:
transport(trsp_socket* sock): sock(sock) {}
transport(trsp_socket* sock, bool triggers_ready = false):
AmThread(triggers_ready), sock(sock)
{}
virtual ~transport();
};

@ -248,7 +248,7 @@ int udp_trsp_socket::send(const sockaddr_storage* sa,
/** @see trsp_socket */
udp_trsp::udp_trsp(udp_trsp_socket* sock)
: transport(sock)
: transport(sock, true)
{
iov[0].iov_base = buf;
iov[0].iov_len = MAX_UDP_MSGLEN;
@ -282,6 +282,8 @@ void udp_trsp::run()
INFO("Started SIP server UDP transport on %s:%i\n",
sock->get_ip(),sock->get_port());
ready();
while (!stop_requested()){
buf_len = recvmsg(sock->get_sd(),&msg,0);
if(buf_len <= 0){

1
debian/control vendored

@ -18,6 +18,7 @@ Build-Depends:
libspandsp-dev,
libspeex-dev,
libssl-dev,
libsystemd-dev,
libxml2-dev,
openssl,
pkgconf,

@ -8,7 +8,7 @@ Wants=remote-fs.target
Wants=mariadb.service
[Service]
Type=simple
Type=notify
User=sems
Group=sems
Environment=LD_LIBRARY_PATH=/usr/lib/sems-pbx

Loading…
Cancel
Save