You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kamailio/modules/tls/sbufq.h

292 lines
7.6 KiB

/*
* $Id$
*
* Copyright (C) 2010 iptelorg GmbH
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
/** minimal overhead buffer queue in shm memory.
* @file modules/tls/sbufq.h
* @ingroup: tls
* Module: @ref tls
*/
/*
* History:
* --------
* 2010-03-31 initial version, based on tcp_conn.h tcp_wbuffer_queue (andrei)
*/
#ifndef __sbufq_h
#define __sbufq_h
#include "../../compiler_opt.h"
#include "../../ut.h"
#include "../../mem/shm_mem.h"
#include "../../timer_ticks.h"
#include "../../timer.h"
#include "../../dprint.h"
#include <string.h>
struct sbuf_elem {
struct sbuf_elem* next;
unsigned int b_size; /**< buf size */
char buf[1]; /**< variable size buffer */
};
struct sbuffer_queue {
struct sbuf_elem* first;
struct sbuf_elem* last;
ticks_t last_chg; /**< last change (creation time or partial flush)*/
unsigned int queued; /**< total size */
unsigned int offset; /**< offset in the first buffer where unflushed data
starts */
unsigned int last_used; /**< how much of the last buffer is used */
};
/* sbufq_flush() output flags */
#define F_BUFQ_EMPTY 1
#define F_BUFQ_ERROR_FLUSH 2
#define sbufq_empty(bq) ((bq)->first==0)
#define sbufq_non_empty(bq) ((bq)->first!=0)
/** adds/appends data to a buffer queue.
* WARNING: it does no attempt to synchronize access/lock. If needed it should
* be called under lock.
* @param q - buffer queue
* @param data
* @param size
* @param min_buf_size - min size to allocate for new buffer elements
* @return 0 on success, -1 on error (mem. allocation)
*/
inline static int sbufq_add(struct sbuffer_queue* q, const void* data,
unsigned int size, unsigned int min_buf_size)
{
struct sbuf_elem* b;
unsigned int last_free;
unsigned int b_size;
unsigned int crt_size;
ticks_t t;
t=get_ticks_raw();
if (likely(q->last==0)) {
b_size=MAX_unsigned(min_buf_size, size);
b=shm_malloc(sizeof(*b)+b_size-sizeof(b->buf));
if (unlikely(b==0))
goto error;
b->b_size=b_size;
b->next=0;
q->last=b;
q->first=b;
q->last_used=0;
q->offset=0;
q->last_chg=get_ticks_raw();
last_free=b_size;
crt_size=size;
goto data_cpy;
}else{
b=q->last;
}
while(size){
last_free=b->b_size-q->last_used;
if (last_free==0){
b_size=MAX_unsigned(min_buf_size, size);
b=shm_malloc(sizeof(*b)+b_size-sizeof(b->buf));
if (unlikely(b==0))
goto error;
b->b_size=b_size;
b->next=0;
q->last->next=b;
q->last=b;
q->last_used=0;
last_free=b->b_size;
}
crt_size=MIN_unsigned(last_free, size);
data_cpy:
memcpy(b->buf+q->last_used, data, crt_size);
q->last_used+=crt_size;
size-=crt_size;
data+=crt_size;
q->queued+=crt_size;
}
return 0;
error:
return -1;
}
/** inserts data (at the beginning) in a buffer queue.
* Note: should never be called after sbufq_run().
* WARNING: it does no attempt to synchronize access/lock. If needed it should
* be called under lock.
* @param q - buffer queue
* @param data
* @param size
* @param min_buf_size - min size to allocate for new buffer elements
* @return 0 on success, -1 on error (mem. allocation)
*/
inline static int sbufq_insert(struct sbuffer_queue* q, const void* data,
unsigned int size, unsigned int min_buf_size)
{
struct sbuf_elem* b;
if (likely(q->first==0)) /* if empty, use sbufq_add */
return sbufq_add(q, data, size, min_buf_size);
if (unlikely(q->offset)){
LOG(L_CRIT, "BUG: non-null offset %d (bad call, should"
"never be called after sbufq_run())\n", q->offset);
goto error;
}
if ((q->first==q->last) && ((q->last->b_size-q->last_used)>=size)){
/* one block with enough space in it for size bytes */
memmove(q->first->buf+size, q->first->buf, size);
memcpy(q->first->buf, data, size);
q->last_used+=size;
}else{
/* create a size bytes block directly */
b=shm_malloc(sizeof(*b)+size-sizeof(b->buf));
if (unlikely(b==0))
goto error;
b->b_size=size;
/* insert it */
b->next=q->first;
q->first=b;
memcpy(b->buf, data, size);
}
q->queued+=size;
return 0;
error:
return -1;
}
/** destroy a buffer queue.
* Only the content is destroyed (shm_free()'d), the queue head is
* re-intialized.
* WARNING: it does no attempt to synchronize access/lock. If needed it should
* be called under lock.
* @param q - buffer queue
* @return - number of bytes that used to be queued (>=0).
*/
inline static unsigned int sbufq_destroy(struct sbuffer_queue* q)
{
struct sbuf_elem* b;
struct sbuf_elem* next_b;
int unqueued;
unqueued=0;
if (likely(q->first)){
b=q->first;
do{
next_b=b->next;
unqueued+=(b==q->last)?q->last_used:b->b_size;
if (b==q->first)
unqueued-=q->offset;
shm_free(b);
b=next_b;
}while(b);
}
memset(q, 0, sizeof(*q));
return unqueued;
}
/** tries to flush the queue.
* Tries to flush as much as possible from the given queue, using the
* given callback.
* WARNING: it does no attempt to synchronize access/lock. If needed it should
* be called under lock.
* @param q - buffer queue
* @param *flags - set to:
* F_BUFQ_EMPTY if the queued is completely flushed
* F_BUFQ_ERROR_FLUSH if the flush_f callback returned error.
* @param flush_f - flush function (callback). modeled after write():
* flush_f(param1, param2, const void* buf, unsigned size).
* It should return the number of bytes "flushed" on
* success, or <0 on error. If the number of bytes
* "flushed" is smaller then the requested size, it
* would be assumed that no more bytes can be flushed
* and sbufq_flush will exit.
* @param flush_p1 - parameter for the flush function callback.
* @param flush_p2 - parameter for the flush function callback.
* @return -1 on internal error, or the number of bytes flushed on
* success (>=0). Note that the flags param is
* always set and it should be used to check for errors, since
* a flush_f() failure will not result in a negative return.
*/
inline static int sbufq_flush(struct sbuffer_queue* q, int* flags,
int (*flush_f)(void* p1, void* p2,
const void* buf,
unsigned size),
void* flush_p1, void* flush_p2)
{
struct sbuf_elem *b;
int n;
int ret;
int block_size;
char* buf;
*flags=0;
ret=0;
while(q->first){
block_size=((q->first==q->last)?q->last_used:q->first->b_size)-
q->offset;
buf=q->first->buf+q->offset;
n=flush_f(flush_p1, flush_p2, buf, block_size);
if (likely(n>0)){
ret+=n;
if (likely(n==block_size)){
b=q->first;
q->first=q->first->next;
shm_free(b);
q->offset=0;
q->queued-=block_size;
}else{
q->offset+=n;
q->queued-=n;
/* no break: if we are here n < block_size => partial write
=> the write should be retried */
}
}else{
if (unlikely(n<0))
*flags|=F_BUFQ_ERROR_FLUSH;
break;
}
}
if (likely(q->first==0)){
q->last=0;
q->last_used=0;
q->offset=0;
*flags|=F_BUFQ_EMPTY;
}
return ret;
}
#endif /*__sbufq_h*/
/* vi: set ts=4 sw=4 tw=79:ai:cindent: */