|
|
|
|
@ -3262,6 +3262,38 @@ done:
|
|
|
|
|
log_info_pop();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void stream_fd_recv(struct obj *obj, char *buf, size_t len, struct sockaddr *sa, struct timeval *tv) {
|
|
|
|
|
struct stream_fd *sfd = (struct stream_fd *) obj;
|
|
|
|
|
call_t *ca = sfd->call;
|
|
|
|
|
if (!ca)
|
|
|
|
|
goto out;
|
|
|
|
|
|
|
|
|
|
rwlock_lock_r(&ca->master_lock);
|
|
|
|
|
|
|
|
|
|
if (sfd->socket.fd == -1) {
|
|
|
|
|
rwlock_unlock_r(&ca->master_lock);
|
|
|
|
|
goto out;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log_info_stream_fd(sfd);
|
|
|
|
|
|
|
|
|
|
rwlock_unlock_r(&ca->master_lock);
|
|
|
|
|
|
|
|
|
|
struct packet_handler_ctx phc;
|
|
|
|
|
ZERO(phc);
|
|
|
|
|
phc.mp.sfd = sfd;
|
|
|
|
|
sfd->socket.family->sockaddr2endpoint(&phc.mp.fsin, sa);
|
|
|
|
|
str_init_len(&phc.s, buf, len);
|
|
|
|
|
|
|
|
|
|
__stream_fd_readable(&phc);
|
|
|
|
|
|
|
|
|
|
if (phc.update)
|
|
|
|
|
redis_update_onekey(ca, rtpe_redis_write);
|
|
|
|
|
|
|
|
|
|
out:
|
|
|
|
|
log_info_pop();
|
|
|
|
|
bufferpool_unref(buf);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -3292,6 +3324,7 @@ stream_fd *stream_fd_new(socket_t *fd, call_t *call, struct local_intf *lif) {
|
|
|
|
|
pi.fd = sfd->socket.fd;
|
|
|
|
|
pi.obj = &sfd->obj;
|
|
|
|
|
pi.readable = stream_fd_readable;
|
|
|
|
|
pi.recv = stream_fd_recv;
|
|
|
|
|
pi.closed = stream_fd_closed;
|
|
|
|
|
|
|
|
|
|
if (sfd->socket.fd != -1) {
|
|
|
|
|
|