You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
sems/core/plug-in/binrpcctrl/CtrlServer.cpp

118 lines
2.9 KiB

#include "AmUtils.h"
#include "log.h"
#include "CtrlServer.h"
CtrlServer::CtrlServer(const string &listen, unsigned listeners,
brpc_tv_t rx_timeout, brpc_tv_t tx_timeout) :
wcnt(listeners)
{
brpc_addr_t *addr;
if (! (addr = brpc_parse_uri(listen.c_str()))) {
throw "failed to parse BINRPC URI `" + listen + "': " +
string(brpc_strerror()) + " [" + int2str(brpc_errno) + "]";
} else if (BRPC_ADDR_TYPE(addr) != SOCK_DGRAM) {
//b/c we'd have to do connection management otherwise (listen for
//connections, poll for activity on each descriptor etc); for now, not
//relly needed and the impl. is much easier.
throw "only datagram listeners supported";
} else {
rxAddr = *addr;
}
if ((rxFd = brpc_socket(addr, /*blocking*/false, /*bind*/true)) < 0)
throw "failed to get listen socket for URI `" + listen + "': " +
string(brpc_strerror()) + " [" + int2str(brpc_errno) + "].\n";
workers = new CtrlWorker[wcnt]();
for (unsigned i = 0; i < wcnt; i ++)
workers[i].init(rxFd, rxAddr, rx_timeout, tx_timeout);
}
CtrlServer::~CtrlServer()
{
INFO("closing SEMS listener FD#%d for %s.\n", rxFd,
brpc_print_addr(&rxAddr));
if (close(rxFd) < 0)
ERROR("CtrlServer server socket#%d closed uncleanly: %s [%d].\n", rxFd,
strerror(errno), errno);
if (BRPC_ADDR_DOMAIN(&rxAddr) == PF_LOCAL) {
if (unlink(BRPC_ADDR_UN(&rxAddr)->sun_path) < 0) {
ERROR("failed to remove unix socket file '%s': %s [%d].\n",
BRPC_ADDR_UN(&rxAddr)->sun_path, strerror(errno), errno);
}
}
delete []workers;
}
void CtrlServer::start()
{
for (unsigned i = 0; i < wcnt; i ++)
workers[i].start();
INFO("CtrlServer started.\n");
}
void CtrlServer::stop()
{
INFO("CtrlServer stopping.\n");
for (unsigned i = 0; i < wcnt; i ++)
workers[i].stop();
}
void CtrlServer::join()
{
for (unsigned i = 0; i < wcnt; i ++)
workers[i].join();
INFO("CtrlServer stopped.\n");
}
CtrlWorker::CtrlWorker() :
rxFd(-1)
{}
void CtrlWorker::init(int rxFd, brpc_addr_t rxAddr,
brpc_tv_t rx_timeout, brpc_tv_t tx_timeout)
{
this->rxFd = rxFd;
this->rxAddr = rxAddr;
this->rx_timeout = rx_timeout;
this->tx_timeout = tx_timeout;
}
void CtrlWorker::run()
{
brpc_addr_t from;
brpc_t *req, *rpl;
INFO("CtrlServer worker #%lx started.\n", pthread_self());
running = 1;
do {
from = rxAddr; // avoid a syscall to find out socket type
if (! (req = brpc_recvfrom(rxFd, &from, rx_timeout)))
continue;
//unsafe
DBG("received BINRPC request `%.*s'.\n", BRPC_STR_FMT(brpc_method(req)));
if ((rpl = brpc_cb_run(req))) {
if (! brpc_sendto(rxFd, &from, rpl, tx_timeout)) {
ERROR("failed to send reply to BINRPC request: %s [%d].\n",
brpc_strerror(), brpc_errno);
}
brpc_finish(rpl);
}
brpc_finish(req);
} while (running);
INFO("CtrlServer worker #%lx stopped.\n", pthread_self());
}
void CtrlWorker::on_stop()
{
running = 0;
}