Codebase list librelp / debian/1.0.3-1 src / sendq.c
debian/1.0.3-1

Tree @debian/1.0.3-1 (Download .tar.gz)

sendq.c @debian/1.0.3-1raw · history · blame

/* The relp sendqueue object.
 *
 * Copyright 2008 by Rainer Gerhards and Adiscon GmbH.
 *
 * This file is part of librelp.
 *
 * Librelp is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Librelp is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with Librelp.  If not, see <http://www.gnu.org/licenses/>.
 *
 * A copy of the GPL can be found in the file "COPYING" in this distribution.
 *
 * If the terms of the GPL are unsuitable for your needs, you may obtain
 * a commercial license from Adiscon. Contact sales@adiscon.com for further
 * details.
 *
 * ALL CONTRIBUTORS PLEASE NOTE that by sending contributions, you assign
 * your copyright to Adiscon GmbH, Germany. This is necessary to permit the
 * dual-licensing set forth here. Our apologies for this inconvenience, but
 * we sincerely believe that the dual-licensing model helps us provide great
 * free software while at the same time obtaining some funding for further
 * development.
 */
#include "config.h"
#include <stdlib.h>
#include <assert.h>
#include <sys/types.h>
#include "relp.h"
#include "sendq.h"
#include "dbllinklist.h"


/* handling for the sendq entries  */

/** Construct a RELP sendq instance
 * This is the first thing that a caller must do before calling any
 * RELP function. The relp sendq must only destructed after all RELP
 * operations have been finished.
 */
static relpRetVal
relpSendqeConstruct(relpSendqe_t **ppThis, relpEngine_t *pEngine)
{
	relpSendqe_t *pThis;

	ENTER_RELPFUNC;
	assert(ppThis != NULL);
	if((pThis = calloc(1, sizeof(relpSendqe_t))) == NULL) {
		ABORT_FINALIZE(RELP_RET_OUT_OF_MEMORY);
	}

	RELP_CORE_CONSTRUCTOR(pThis, Sendqe);
	pThis->pEngine = pEngine;

	*ppThis = pThis;

finalize_it:
	LEAVE_RELPFUNC;
}


/** Destruct a RELP sendq instance
 */
static relpRetVal
relpSendqeDestruct(relpSendqe_t **ppThis)
{
	relpSendqe_t *pThis;

	ENTER_RELPFUNC;
	assert(ppThis != NULL);
	pThis = *ppThis;
	RELPOBJ_assert(pThis, Sendqe);

	relpSendbufDestruct(&pThis->pBuf);

	/* done with de-init work, now free sendqe object itself */
	free(pThis);
	*ppThis = NULL;

	LEAVE_RELPFUNC;
}



/* handling for the sendq itself */


/** Construct a RELP sendq instance
 * This is the first thing that a caller must do before calling any
 * RELP function. The relp sendq must only destructed after all RELP
 * operations have been finished.
 */
relpRetVal
relpSendqConstruct(relpSendq_t **ppThis, relpEngine_t *pEngine)
{
	relpSendq_t *pThis;

	ENTER_RELPFUNC;
	assert(ppThis != NULL);
	if((pThis = calloc(1, sizeof(relpSendq_t))) == NULL) {
		ABORT_FINALIZE(RELP_RET_OUT_OF_MEMORY);
	}

	RELP_CORE_CONSTRUCTOR(pThis, Sendq);
	pThis->pEngine = pEngine;
	pthread_mutex_init(&pThis->mut, NULL);

	*ppThis = pThis;

finalize_it:
	LEAVE_RELPFUNC;
}


/** Destruct a RELP sendq instance
 */
relpRetVal
relpSendqDestruct(relpSendq_t **ppThis)
{
	relpSendq_t *pThis;

	ENTER_RELPFUNC;
	assert(ppThis != NULL);
	pThis = *ppThis;
	RELPOBJ_assert(pThis, Sendq);

	pthread_mutex_destroy(&pThis->mut);
	/* done with de-init work, now free sendq object itself */
	free(pThis);
	*ppThis = NULL;

	LEAVE_RELPFUNC;
}


/* add a sendbuffer to the queue
 * the send buffer object is handed over, caller must no longer access it after
 * it has been passed into here.
 * rgerhards, 2008-03-17
 */
relpRetVal
relpSendqAddBuf(relpSendq_t *pThis, relpSendbuf_t *pBuf, relpTcp_t *pTcp)
{
	relpSendqe_t *pEntry;

	ENTER_RELPFUNC;
	RELPOBJ_assert(pThis, Sendq);
	RELPOBJ_assert(pBuf, Sendbuf);
	RELPOBJ_assert(pTcp, Tcp);

	CHKRet(relpSendqeConstruct(&pEntry, pThis->pEngine));
	pEntry->pBuf = pBuf;

	pthread_mutex_lock(&pThis->mut);
	DLL_Add(pEntry, pThis->pRoot, pThis->pLast);
	pthread_mutex_unlock(&pThis->mut);

	/* we now try to send it. We always have non-blocking sockets in the server
	 * so it doesn't hurt if that's not possible. But if it is, we save ourselvs
	 * one round of select() (and this is assumed to be the regular case!)
	 */
	iRet = relpSendqSend(pThis, pTcp);
	if(iRet == RELP_RET_PARTIAL_WRITE)
		iRet = RELP_RET_OK; /* this code is well ok! */

finalize_it:
	LEAVE_RELPFUNC;
}


/* remove the sendbuffer at the top of the queue. The removed buffer is destructed.
 * This must only be called if there is at least one send buffer inside the queue.
 * rgerhards, 2008-03-17
 */
relpRetVal
relpSendqDelFirstBuf(relpSendq_t *pThis)
{
	relpSendqe_t *pEntry;

	ENTER_RELPFUNC;
	RELPOBJ_assert(pThis, Sendq);
	RELPOBJ_assert(pThis->pRoot, Sendqe);

	pthread_mutex_lock(&pThis->mut);
	pEntry = pThis->pRoot;
	DLL_Del(pEntry, pThis->pRoot, pThis->pLast);
	pthread_mutex_unlock(&pThis->mut);

	CHKRet(relpSendqeDestruct(&pEntry));

finalize_it:
	LEAVE_RELPFUNC;
}


/* Returns if the sendq is empty (boolean true) or not (0).
 * Note the differet calling conventions - no relpRetVal but the
 * boolen value. This is much more convenient, especially as nothing
 * can go wrong with this simply object access method.
 * Note that we must lock the mutex because an add operation may
 * be in progress concurrently (TODO: revisit this once we are done
 * with the complete implementation, all these mutex operations may
 * not really be necessary).
 * rgerhards, 2008-03-19
 */
int
relpSendqIsEmpty(relpSendq_t *pThis)
{
	int ret;

	/* TODO: an atomic operation would also do nicely... */
	pthread_mutex_lock(&pThis->mut);
	ret = pThis->pRoot == NULL;
	pthread_mutex_unlock(&pThis->mut);
pThis->pEngine->dbgprint("relpSendqIsEmpty() returns %d\n", ret);
	return ret;
}


/* Sends as much data from the top of the sendq as possible.
 * This function tries to send as much data from the top of the queue
 * as possible, destroying all sendbuf's that have been processed.
 * Please note that if a sendbuf is too large to be sent in
 * a single operation, a partial buffer may be sent.
 * rgerhards, 2008-03-19
 */
relpRetVal
relpSendqSend(relpSendq_t *pThis, relpTcp_t *pTcp)
{
	relpSendqe_t *pEntry;
	relpRetVal localRet;

	ENTER_RELPFUNC;
	RELPOBJ_assert(pThis, Sendq);
	RELPOBJ_assert(pTcp,  Tcp);

	pEntry = pThis->pRoot;

	while(pEntry != NULL) {
		RELPOBJ_assert(pEntry, Sendqe);
		localRet = relpSendbufSend(pEntry->pBuf, pTcp);
		if(localRet == RELP_RET_OK) {
			/* in this case, the full buffer has been sent and we can
			 * destruct the sendbuf.
			 */
			CHKRet(relpSendqDelFirstBuf(pThis));
			pEntry = pThis->pRoot; /* as we deleted from the root, the is our next entry */
		} else if(localRet != RELP_RET_PARTIAL_WRITE) {
			ABORT_FINALIZE(localRet);
		}
	}

finalize_it:
	LEAVE_RELPFUNC;
}