introduce struct udp_fd

git.mgm/mediaproxy-ng/2.2
Richard Fuchs 12 years ago
parent 4c14016f9f
commit a67f03d1d3

@ -126,7 +126,7 @@ static void stream_closed(int fd, void *p, uintptr_t u) {
mutex_lock(&cs->lock); mutex_lock(&cs->lock);
r = &cs->peers[u >> 1].rtps[u & 1]; r = &cs->peers[u >> 1].rtps[u & 1];
assert(r->fd == fd); assert(r->fd.fd == fd);
mutex_unlock(&cs->lock); mutex_unlock(&cs->lock);
c = cs->call; c = cs->call;
@ -166,12 +166,12 @@ void kernelize(struct callstream *c) {
r = &p->rtps[j]; r = &p->rtps[j];
rp = &pp->rtps[j]; rp = &pp->rtps[j];
if (IN6_IS_ADDR_UNSPECIFIED(&r->peer.ip46) || !r->fd_family || !r->peer.port) if (IN6_IS_ADDR_UNSPECIFIED(&r->peer.ip46) || !r->fd.fd_family || !r->peer.port)
continue; continue;
ks.local_port = r->localport; ks.local_port = r->fd.localport;
ks.tos = cm->conf.tos; ks.tos = cm->conf.tos;
ks.src.port = rp->localport; ks.src.port = rp->fd.localport;
ks.dest.port = r->peer.port; ks.dest.port = r->peer.port;
if (IN6_IS_ADDR_V4MAPPED(&r->peer.ip46)) { if (IN6_IS_ADDR_V4MAPPED(&r->peer.ip46)) {
@ -226,9 +226,9 @@ static int stream_packet(struct streamrelay *r, char *b, int l, struct sockaddr_
m = c->callmaster; m = c->callmaster;
smart_ntop_port(addr, fsin, sizeof(addr)); smart_ntop_port(addr, fsin, sizeof(addr));
if (p->fd == -1) { if (p->fd.fd == -1) {
mylog(LOG_WARNING, LOG_PREFIX_C "RTP packet to port %u discarded from %s", mylog(LOG_WARNING, LOG_PREFIX_C "RTP packet to port %u discarded from %s",
LOG_PARAMS_C(c), r->localport, addr); LOG_PARAMS_C(c), r->fd.localport, addr);
r->stats.errors++; r->stats.errors++;
mutex_lock(&m->statspslock); mutex_lock(&m->statspslock);
m->statsps.errors++; m->statsps.errors++;
@ -243,7 +243,7 @@ static int stream_packet(struct streamrelay *r, char *b, int l, struct sockaddr_
goto peerinfo; goto peerinfo;
mylog(LOG_DEBUG, LOG_PREFIX_C "Confirmed peer information for port %u - %s", mylog(LOG_DEBUG, LOG_PREFIX_C "Confirmed peer information for port %u - %s",
LOG_PARAMS_C(c), r->localport, addr); LOG_PARAMS_C(c), r->fd.localport, addr);
pe->confirmed = 1; pe->confirmed = 1;
@ -269,7 +269,7 @@ peerinfo:
update = 1; update = 1;
forward: forward:
if (IN6_IS_ADDR_UNSPECIFIED(&r->peer.ip46) || !r->peer.port || !r->fd_family) if (IN6_IS_ADDR_UNSPECIFIED(&r->peer.ip46) || !r->peer.port || !r->fd.fd_family)
goto drop; goto drop;
ZERO(mh); ZERO(mh);
@ -279,7 +279,7 @@ forward:
ch = CMSG_FIRSTHDR(&mh); ch = CMSG_FIRSTHDR(&mh);
ZERO(*ch); ZERO(*ch);
switch (r->fd_family) { switch (r->fd.fd_family) {
case AF_INET: case AF_INET:
ZERO(sin); ZERO(sin);
sin.sin_family = AF_INET; sin.sin_family = AF_INET;
@ -335,7 +335,7 @@ ipv4_src:
mh.msg_iov = &iov; mh.msg_iov = &iov;
mh.msg_iovlen = 1; mh.msg_iovlen = 1;
ret = sendmsg(p->fd, &mh, 0); ret = sendmsg(p->fd.fd, &mh, 0);
if (ret == -1) { if (ret == -1) {
r->stats.errors++; r->stats.errors++;
@ -380,7 +380,7 @@ static void stream_readable(int fd, void *p, uintptr_t u) {
mutex_lock(&cs->lock); mutex_lock(&cs->lock);
r = &cs->peers[u >> 1].rtps[u & 1]; r = &cs->peers[u >> 1].rtps[u & 1];
if (r->fd != fd) if (r->fd.fd != fd)
goto out; goto out;
for (;;) { for (;;) {
@ -399,7 +399,7 @@ static void stream_readable(int fd, void *p, uintptr_t u) {
if (ret >= sizeof(buf)) if (ret >= sizeof(buf))
mylog(LOG_WARNING, "UDP packet possibly truncated"); mylog(LOG_WARNING, "UDP packet possibly truncated");
if (ss.ss_family != r->fd_family) if (ss.ss_family != r->fd.fd_family)
abort(); abort();
sinp = &ss; sinp = &ss;
@ -522,11 +522,11 @@ static void call_timer_iterator(void *key, void *val, void *ptr) {
p = &cs->peers[i]; p = &cs->peers[i];
for (j = 0; j < 2; j++) { for (j = 0; j < 2; j++) {
sr = &p->rtps[j]; sr = &p->rtps[j];
if (!sr->localport) if (!sr->fd.localport)
continue; continue;
if (hlp->ports[sr->localport]) if (hlp->ports[sr->fd.localport])
abort(); abort();
hlp->ports[sr->localport] = sr; hlp->ports[sr->fd.localport] = sr;
obj_hold(cs); obj_hold(cs);
if (good) if (good)
@ -801,7 +801,7 @@ fail:
static int get_port4(struct streamrelay *r, u_int16_t p, struct callmaster *m) { static int get_port4(struct udp_fd *r, u_int16_t p, struct callmaster *m) {
int fd; int fd;
struct sockaddr_in sin; struct sockaddr_in sin;
@ -830,7 +830,7 @@ fail:
return -1; return -1;
} }
static int get_port6(struct streamrelay *r, u_int16_t p, struct callmaster *m) { static int get_port6(struct udp_fd *r, u_int16_t p, struct callmaster *m) {
int fd; int fd;
struct sockaddr_in6 sin; struct sockaddr_in6 sin;
int tos; int tos;
@ -866,7 +866,7 @@ fail:
return -1; return -1;
} }
static int get_port(struct streamrelay *r, u_int16_t p, struct callmaster *m) { static int get_port(struct udp_fd *r, u_int16_t p, struct callmaster *m) {
int ret; int ret;
assert(r->fd == -1); assert(r->fd == -1);
@ -896,7 +896,7 @@ static int get_port(struct streamrelay *r, u_int16_t p, struct callmaster *m) {
return 0; return 0;
} }
static void release_port(struct streamrelay *r, struct callmaster *m) { static void release_port(struct udp_fd *r, struct callmaster *m) {
if (r->fd == -1 || !r->localport) if (r->fd == -1 || !r->localport)
return; return;
mutex_lock(&m->portlock); mutex_lock(&m->portlock);
@ -907,9 +907,9 @@ static void release_port(struct streamrelay *r, struct callmaster *m) {
r->localport = 0; r->localport = 0;
} }
static int get_consecutive_ports(struct streamrelay *array, int array_len, int wanted_start_port, struct call *c) { static int get_consecutive_ports(struct udp_fd *array, int array_len, int wanted_start_port, struct call *c) {
int i, j, cycle = 0; int i, j, cycle = 0;
struct streamrelay *it; struct udp_fd *it;
u_int16_t port; u_int16_t port;
struct callmaster *m = c->callmaster; struct callmaster *m = c->callmaster;
@ -1029,7 +1029,7 @@ static void steal_peer(struct peer *dest, struct peer *src) {
po = m->poller; po = m->poller;
mylog(LOG_DEBUG, LOG_PREFIX_CI "Re-using existing open RTP port %u", mylog(LOG_DEBUG, LOG_PREFIX_CI "Re-using existing open RTP port %u",
LOG_PARAMS_CI(c), r->localport); LOG_PARAMS_CI(c), r->fd.localport);
dest->confirmed = 0; dest->confirmed = 0;
unkernelize(dest); unkernelize(dest);
@ -1047,33 +1047,25 @@ static void steal_peer(struct peer *dest, struct peer *src) {
sr = &dest->rtps[i]; sr = &dest->rtps[i];
srs = &src->rtps[i]; srs = &src->rtps[i];
if (sr->fd != -1) { if (sr->fd.fd != -1) {
mylog(LOG_DEBUG, LOG_PREFIX_CI "Closing port %u in favor of re-use", mylog(LOG_DEBUG, LOG_PREFIX_CI "Closing port %u in favor of re-use",
LOG_PARAMS_CI(c), sr->localport); LOG_PARAMS_CI(c), sr->fd.localport);
poller_del_item(po, sr->fd); poller_del_item(po, sr->fd.fd);
release_port(sr, m); release_port(&sr->fd, m);
} }
sr->fd = srs->fd; sr->fd = srs->fd;
sr->fd_family = srs->fd_family; sr->peer = srs->peer;
sr->peer_advertised = srs->peer_advertised;
sr->peer.ip46 = srs->peer.ip46;
sr->peer.port = srs->peer.port;
sr->peer_advertised.ip46 = srs->peer_advertised.ip46;
sr->peer_advertised.port = srs->peer_advertised.port;
sr->localport = srs->localport; srs->fd.fd = -1;
srs->fd.fd_family = 0;
srs->fd.localport = 0;
ZERO(srs->peer);
ZERO(srs->peer_advertised);
pi.fd = sr->fd.fd;
srs->fd = -1;
srs->fd_family = 0;
ZERO(srs->peer.ip46);
srs->peer.port = 0;
srs->localport = 0;
ZERO(srs->peer_advertised.ip46);
srs->peer_advertised.port = 0;
pi.fd = sr->fd;
pi.obj = &sr->up->up->obj; pi.obj = &sr->up->up->obj;
pi.uintp = i | (dest->idx << 1); pi.uintp = i | (dest->idx << 1);
pi.readable = stream_readable; pi.readable = stream_readable;
@ -1084,12 +1076,11 @@ static void steal_peer(struct peer *dest, struct peer *src) {
} }
/* XXX no need for a full streamrelay struct, split it */
/* XXX return value? */
void callstream_init(struct callstream *s, struct relays_cache *rc) { void callstream_init(struct callstream *s, struct relays_cache *rc) {
int i, j; int i, j;
struct peer *p; struct peer *p;
struct streamrelay *r, *relay_AB; struct streamrelay *r;
struct udp_fd *relay_AB;
struct poller_item pi; struct poller_item pi;
struct call *c = s->call; struct call *c = s->call;
struct poller *po = c->callmaster->poller; struct poller *po = c->callmaster->poller;
@ -1107,17 +1098,15 @@ void callstream_init(struct callstream *s, struct relays_cache *rc) {
for (j = 0; j < 2; j++) { for (j = 0; j < 2; j++) {
r = &p->rtps[j]; r = &p->rtps[j];
r->fd = -1; r->fd.fd = -1;
r->idx = j; r->idx = j;
r->up = p; r->up = p;
r->last = poller_now; r->last = poller_now;
if (relay_AB && relay_AB[j].fd != -1) { if (relay_AB && relay_AB[j].fd != -1) {
r->fd = relay_AB[j].fd; r->fd = relay_AB[j];
r->fd_family = relay_AB[j].fd_family;
r->localport = relay_AB[j].localport;
pi.fd = r->fd; pi.fd = r->fd.fd;
pi.obj = &s->obj; pi.obj = &s->obj;
pi.uintp = (i << 1) | j; pi.uintp = (i << 1) | j;
pi.readable = stream_readable; pi.readable = stream_readable;
@ -1148,7 +1137,7 @@ static void callstream_free(void *ptr) {
for (j = 0; j < 2; j++) { for (j = 0; j < 2; j++) {
r = &p->rtps[j]; r = &p->rtps[j];
release_port(r, m); release_port(&r->fd, m);
} }
} }
mutex_destroy(&s->lock); mutex_destroy(&s->lock);
@ -1404,7 +1393,7 @@ static void unkernelize(struct peer *p) {
for (i = 0; i < 2; i++) { for (i = 0; i < 2; i++) {
r = &p->rtps[i]; r = &p->rtps[i];
kernel_del_stream(p->up->call->callmaster->conf.kernelfd, r->localport); kernel_del_stream(p->up->call->callmaster->conf.kernelfd, r->fd.localport);
} }
p->kernelized = 0; p->kernelized = 0;
@ -1425,8 +1414,8 @@ static void kill_callstream(struct callstream *s) {
for (j = 0; j < 2; j++) { for (j = 0; j < 2; j++) {
r = &p->rtps[j]; r = &p->rtps[j];
if (r->fd != -1) if (r->fd.fd != -1)
poller_del_item(s->call->callmaster->poller, r->fd); poller_del_item(s->call->callmaster->poller, r->fd.fd);
} }
} }
} }
@ -1464,13 +1453,13 @@ static void call_destroy(struct call *c) {
"RTP[%u] %lu p, %lu b, %lu e; " "RTP[%u] %lu p, %lu b, %lu e; "
"RTCP[%u] %lu p, %lu b, %lu e", "RTCP[%u] %lu p, %lu b, %lu e",
LOG_PARAMS_C(c), LOG_PARAMS_C(c),
s->peers[0].rtps[0].localport, s->peers[0].rtps[0].stats.packets, s->peers[0].rtps[0].fd.localport, s->peers[0].rtps[0].stats.packets,
s->peers[0].rtps[0].stats.bytes, s->peers[0].rtps[0].stats.errors, s->peers[0].rtps[0].stats.bytes, s->peers[0].rtps[0].stats.errors,
s->peers[0].rtps[1].localport, s->peers[0].rtps[1].stats.packets, s->peers[0].rtps[1].fd.localport, s->peers[0].rtps[1].stats.packets,
s->peers[0].rtps[1].stats.bytes, s->peers[0].rtps[1].stats.errors, s->peers[0].rtps[1].stats.bytes, s->peers[0].rtps[1].stats.errors,
s->peers[1].rtps[0].localport, s->peers[1].rtps[0].stats.packets, s->peers[1].rtps[0].fd.localport, s->peers[1].rtps[0].stats.packets,
s->peers[1].rtps[0].stats.bytes, s->peers[1].rtps[0].stats.errors, s->peers[1].rtps[0].stats.bytes, s->peers[1].rtps[0].stats.errors,
s->peers[1].rtps[1].localport, s->peers[1].rtps[1].stats.packets, s->peers[1].rtps[1].fd.localport, s->peers[1].rtps[1].stats.packets,
s->peers[1].rtps[1].stats.bytes, s->peers[1].rtps[1].stats.errors); s->peers[1].rtps[1].stats.bytes, s->peers[1].rtps[1].stats.errors);
kill_callstream(s); kill_callstream(s);
mutex_unlock(&s->lock); mutex_unlock(&s->lock);
@ -1570,7 +1559,7 @@ static str *streams_print(GQueue *s, int num, enum call_opmode opmode, const cha
for (i = 0, l = s->head; i < num && l; i++, l = l->next) { for (i = 0, l = s->head; i < num && l; i++, l = l->next) {
t = l->data; t = l->data;
x = &t->peers[off].rtps[0]; x = &t->peers[off].rtps[0];
g_string_append_printf(o, (format == 1) ? "%u " : " %u", x->localport); g_string_append_printf(o, (format == 1) ? "%u " : " %u", x->fd.localport);
} }
if (format == SAF_UDP) { if (format == SAF_UDP) {
@ -2029,7 +2018,7 @@ static void call_status_iterator(struct call *c, struct control_stream *s) {
rx1 = &p->rtps[1]; rx1 = &p->rtps[1];
rx2 = &cs->peers[1].rtps[1]; rx2 = &cs->peers[1].rtps[1];
if (r1->fd == -1 || r2->fd == -1) if (r1->fd.fd == -1 || r2->fd.fd == -1)
goto next; goto next;
smart_ntop_p(addr1, &r1->peer.ip46, sizeof(addr1)); smart_ntop_p(addr1, &r1->peer.ip46, sizeof(addr1));
@ -2042,7 +2031,7 @@ static void call_status_iterator(struct call *c, struct control_stream *s) {
control_stream_printf(s, "stream %s:%u %s:%u %s:%u %llu/%llu/%llu %s %s - %i\n", control_stream_printf(s, "stream %s:%u %s:%u %s:%u %llu/%llu/%llu %s %s - %i\n",
addr1, r1->peer.port, addr1, r1->peer.port,
addr2, r2->peer.port, addr2, r2->peer.port,
addr3, r1->localport, addr3, r1->fd.localport,
(long long unsigned int) r1->stats.bytes + rx1->stats.bytes, (long long unsigned int) r1->stats.bytes + rx1->stats.bytes,
(long long unsigned int) r2->stats.bytes + rx2->stats.bytes, (long long unsigned int) r2->stats.bytes + rx2->stats.bytes,
(long long unsigned int) r1->stats.bytes + rx1->stats.bytes + r2->stats.bytes + rx2->stats.bytes, (long long unsigned int) r1->stats.bytes + rx1->stats.bytes + r2->stats.bytes + rx2->stats.bytes,

@ -60,12 +60,15 @@ struct stream_input {
struct stream stream; struct stream stream;
enum stream_direction direction[2]; enum stream_direction direction[2];
}; };
struct streamrelay { struct udp_fd {
int fd; int fd;
int fd_family; int fd_family;
u_int16_t localport;
};
struct streamrelay {
struct udp_fd fd;
struct stream peer; struct stream peer;
struct stream peer_advertised; struct stream peer_advertised;
u_int16_t localport;
unsigned char idx; unsigned char idx;
struct peer *up; struct peer *up;
struct stats stats; struct stats stats;
@ -73,9 +76,9 @@ struct streamrelay {
time_t last; time_t last;
}; };
struct relays_cache { struct relays_cache {
struct streamrelay relays_A[16]; struct udp_fd relays_A[16];
struct streamrelay relays_B[16]; struct udp_fd relays_B[16];
struct streamrelay *array_ptrs[2]; struct udp_fd *array_ptrs[2];
int relays_open; int relays_open;
}; };
struct peer { struct peer {

@ -407,7 +407,7 @@ static int replace_port(struct string_chopper *chop, str *port, GList *m, int of
if (copy_up_to(chop, port)) if (copy_up_to(chop, port))
return -1; return -1;
g_string_append_printf(chop->output, "%hu", sr->localport); g_string_append_printf(chop->output, "%hu", sr->fd.localport);
if (skip_over(chop, port)) if (skip_over(chop, port))
return -1; return -1;

Loading…
Cancel
Save