Codebase list pd-iemnet / fresh-snapshots/main iemnet_sender.c
fresh-snapshots/main

Tree @fresh-snapshots/main (Download .tar.gz)

iemnet_sender.c @fresh-snapshots/mainraw · history · blame

/* iemnet
 *
 * sender
 *   sends data "chunks" to a socket
 *   possibly threaded
 *
 *  copyright © 2010-2015 IOhannes m zmölnig, IEM
 */

/* This program is free software; you can redistribute it and/or                */
/* modify it under the terms of the GNU General Public License                  */
/* as published by the Free Software Foundation; either version 2               */
/* of the License, or (at your option) any later version.                       */
/*                                                                              */
/* This program is distributed in the hope that it will be useful,              */
/* but WITHOUT ANY WARRANTY; without even the implied warranty of               */
/* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the                */
/* GNU General Public License for more details.                                 */
/*                                                                              */
/* You should have received a copy of the GNU General Public License            */
/* along with this program; if not, see                                         */
/*     http://www.gnu.org/licenses/                                             */
/*                                                                              */

#define DEBUGLEVEL 2

#include "iemnet.h"
#include "iemnet_data.h"


#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>

#include <sys/types.h>

#ifdef _WIN32
# include <winsock2.h>
# include <ws2tcpip.h> /* for socklen_t */
#else
# include <sys/socket.h>
#endif

#include <pthread.h>

#if IEMNET_HAVE_DEBUG
static int debug_lockcount=0;
# define LOCK(x) do {if(iemnet_debug(DEBUGLEVEL, __FILE__, __LINE__, __FUNCTION__))post("  LOCKing %p", x); pthread_mutex_lock(x);debug_lockcount++;  if(iemnet_debug(DEBUGLEVEL, __FILE__, __LINE__, __FUNCTION__))post("  LOCKed  %p[%d]", x, debug_lockcount); } while(0)
# define UNLOCK(x) do {debug_lockcount--;if(iemnet_debug(DEBUGLEVEL, __FILE__, __LINE__, __FUNCTION__))post("  UNLOCK %p [%d]", x, debug_lockcount); pthread_mutex_unlock(x);}while(0)
#else
# define LOCK(x) pthread_mutex_lock(x)
# define UNLOCK(x) pthread_mutex_unlock(x)
#endif

/* draft:
 *   - there is a sender thread for each open connection
 *   - the main thread just adds chunks to each sender threads processing queue
 *   - the sender thread tries to send the queue as fast as possible
 */

struct _iemnet_sender {
  pthread_t thread;

  int sockfd; /* owned outside; must call iemnet__sender_destroy() before freeing socket yourself */
  t_iemnet_queue*queue;
  int keepsending; // indicates whether we want to thread to continue or to terminate
  int isrunning;

  const void*userdata; /* user provided data */
  t_iemnet_sendfunction sendfun; /* user provided send function */

  pthread_mutex_t mtx; /* mutex to protect isrunning,.. */
};

/* the workhorse of the family */


static int iemnet__sender_defaultsend(const void*x, int sockfd,
                                      t_iemnet_chunk*c)
{
  int result=-1;

  struct sockaddr_in  to;
  socklen_t           tolen = sizeof(to);

  unsigned char*data=c->data;
  unsigned int size=c->size;

  int flags = 0;
#ifdef __linux__
  flags |= MSG_NOSIGNAL;
#endif


  //    fprintf(stderr, "sending %d bytes at %x to %d\n", size, data, sockfd);
  if(c->port) {
    DEBUG("sending %d bytes to %x:%d @%d", size, c->addr, c->port, c->family);

    to.sin_addr.s_addr=htonl(c->addr);
    to.sin_port       =htons(c->port);
    to.sin_family     =c->family;
    result = sendto(sockfd,
                    data, size, /* DATA */
                    flags,      /* FLAGS */
                    (struct sockaddr *)&to, tolen); /* DESTADDR */
  } else {
    DEBUG("sending %d bytes", size);
    result = send(sockfd,
                  data, size, /* DATA */
                  flags);     /* FLAGS */
  }
  if(result<0) {
    // broken pipe
    return 0;
  }

  // shouldn't we do something with the result here?
  DEBUG("sent %d bytes", result);
  return 1;
}

static void*iemnet__sender_sendthread(void*arg)
{
  t_iemnet_sender*sender=(t_iemnet_sender*)arg;

  int sockfd=-1;
  t_iemnet_queue*q=NULL;
  t_iemnet_chunk*c=NULL;
  t_iemnet_sendfunction dosend=iemnet__sender_defaultsend;
  const void*userdata=NULL;

  LOCK(&sender->mtx);
  q=sender->queue;
  userdata=sender->userdata;
  if(NULL!=sender->sendfun) {
    dosend=sender->sendfun;
  }

  sockfd=sender->sockfd;


  while(sender->keepsending) {
    UNLOCK(&sender->mtx);

    c=queue_pop_block(q);
    if(c) {
      if(!dosend(userdata, sockfd, c)) {
        iemnet__chunk_destroy(c);

        LOCK(&sender->mtx);
        break;
      }
      iemnet__chunk_destroy(c);
      c=NULL;
    }
    LOCK(&sender->mtx);
  }
  sender->isrunning=0;
  UNLOCK (&sender->mtx);
  DEBUG("send thread terminated");
  return NULL;
}

int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c)
{
  t_iemnet_queue*q=0;
  int size=-1;
  LOCK (&s->mtx);
  q=s->queue;
  if(!s->isrunning) {
    UNLOCK (&s->mtx);
    return -1;
  }
  UNLOCK (&s->mtx);
  if(q) {
    t_iemnet_chunk*chunk=iemnet__chunk_create_chunk(c);
    size = queue_push(q, chunk);
  }
  return size;
}

void iemnet__sender_destroy(t_iemnet_sender*s, int subthread)
{
  int sockfd=-1;
  /* simple protection against recursive calls:
   * s->keepsending is only set to "0" in here,
   * so if it is false, we know that we are already being called
   */
  DEBUG("destroy sender %x with queue %x (%d)", s, s->queue, s->keepsending);
  LOCK (&s->mtx);
  sockfd=s->sockfd;
  // check s->isrunning
  DEBUG("keepsending %d\tisrunning %d", s->keepsending, s->isrunning);
  if(!s->keepsending) {
    UNLOCK (&s->mtx);
    return;
  }
  s->keepsending=0;

  while(s->isrunning) {
    s->keepsending=0;
    queue_finish(s->queue);
    UNLOCK (&s->mtx);
    LOCK (&s->mtx);
  }

  UNLOCK (&s->mtx);

  queue_finish(s->queue);
  DEBUG("queue finished");

  pthread_join(s->thread, NULL);
  DEBUG("thread joined");
  queue_destroy(s->queue);

  pthread_mutex_destroy (&s->mtx);

  memset(s, 0, sizeof(t_iemnet_sender));
  s->sockfd = -1;
  free(s);
  s=NULL;
  DEBUG("destroyed sender");
}


t_iemnet_sender*iemnet__sender_create(int sock,
                                      t_iemnet_sendfunction sendfun, const void*userdata,
                                      int subthread)
{
  static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
  t_iemnet_sender*result=(t_iemnet_sender*)calloc(1,
                         sizeof(t_iemnet_sender));
  int res=0;
  DEBUG("create sender %x", result);
  if(NULL==result) {
    DEBUG("create sender failed");
    return NULL;
  }

  result->queue = queue_create();
  result->sockfd = sock;
  result->keepsending =1;
  result->isrunning=1;
  result->sendfun=sendfun;
  result->userdata=userdata;
  DEBUG("create_sender queue=%x", result->queue);

  memcpy(&result->mtx , &mtx, sizeof(pthread_mutex_t));
  res=pthread_create(&result->thread, 0, iemnet__sender_sendthread, result);

  if(0==res) {
  } else {
    // something went wrong
    queue_destroy(result->queue);
    free(result);
    return NULL;
  }

  DEBUG("created sender");
  return result;
}

/* coverity[param_set_but_not_used]: as x is there for potentially more specific implentations in the future */
int iemnet__sender_getlasterror(t_iemnet_sender*x)
{
#ifdef _WIN32
  return WSAGetLastError();
#endif
  return errno;
}


int iemnet__sender_getsockopt(t_iemnet_sender*s, int level, int optname,
                              void      *optval, socklen_t*optlen)
{
  int result=getsockopt(s->sockfd, level, optname, optval, optlen);
  if(result!=0) {
    error("iemnet::sender: getsockopt returned %d",
         iemnet__sender_getlasterror(s));
  }
  return result;
}
int iemnet__sender_setsockopt(t_iemnet_sender*s, int level, int optname,
                              const void*optval, socklen_t optlen)
{
  int result=setsockopt(s->sockfd, level, optname, optval, optlen);
  if(result!=0) {
    error("iemnet::sender: setsockopt returned %d",
         iemnet__sender_getlasterror(s));
  }
  return result;
}



int iemnet__sender_getsize(t_iemnet_sender*x)
{
  int size=-1;
  if(x && x->queue) {
    size=queue_getsize(x->queue);
  }

  return size;
}