Logo Search packages:      
Sourcecode: ladcca version File versions  Download package

comm.c

/*
 *   LADCCA
 *    
 *   Copyright (C) 2002, 2003 Robert Ham <rah@bash.sh>
 *    
 *   This program is free software; you can redistribute it and/or modify
 *   it under the terms of the GNU General Public License as published by
 *   the Free Software Foundation; either version 2 of the License, or
 *   (at your option) any later version.
 *
 *   This program is distributed in the hope that it will be useful,
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *   GNU General Public License for more details.
 *
 *   You should have received a copy of the GNU General Public License
 *   along with this program; if not, write to the Free Software
 *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
 */

#include <stdint.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include <netdb.h>
extern int h_errno;

#include <ladcca/ladcca.h>
#include <ladcca/internal_headers.h>

int
cca_comm_send_event (int socket, cca_comm_event_t * event)
{
  int err;
  size_t buf_size;
  char * buf;
  
  switch (event->type)
  {
  case CCA_Comm_Event_Connect:
    cca_buffer_from_comm_event_connect (&buf, &buf_size, event->event_data.connect);
    break;
  case CCA_Comm_Event_Event:
    cca_buffer_from_comm_event_event (&buf, &buf_size, event->event_data.event);
    break;
  case CCA_Comm_Event_Config:
    cca_buffer_from_comm_event_config (&buf, &buf_size, event->event_data.config);
    break;
  case CCA_Comm_Event_Protocol_Mismatch:
    cca_buffer_from_comm_event_protocol_mismatch (&buf, &buf_size, event->event_data.number);
    break;
  case CCA_Comm_Event_Exec:
    cca_buffer_from_comm_event_exec (&buf, &buf_size, event->event_data.exec);
    break;
  case CCA_Comm_Event_Close:
  case CCA_Comm_Event_Ping:
  case CCA_Comm_Event_Pong:
    cca_buffer_from_comm_event (&buf, &buf_size, event);
    break;
  }

  /* send the buffer */
  err = cca_sendall (socket, buf, buf_size, 0);
  if (err == -1)
    {
      fprintf (stderr, "%s: error sending client event\n", __FUNCTION__);
    }

  free (buf);
  
  return err;
}


int
cca_comm_recv_event (int socket, cca_comm_event_t * event)
{
  char * buf;
  size_t buf_size;
  int err;
  uint32_t * iptr;
  
  err = cca_recvall (socket, (void **) &buf, &buf_size, 0);
  if (err < 0)
    return err;
  
  iptr = (uint32_t *) buf;
  event->type = ntohl (*iptr);
  
  
  switch (event->type)
  {
  case CCA_Comm_Event_Connect:
    err = cca_comm_event_from_buffer_connect (buf, buf_size, event);
    if (err)
      return -3;
    break;
  case CCA_Comm_Event_Event:
    cca_comm_event_from_buffer_event (buf, buf_size, event);
    break;
  case CCA_Comm_Event_Config:
    cca_comm_event_from_buffer_config (buf, buf_size, event);
    break;
  case CCA_Comm_Event_Protocol_Mismatch:
    cca_comm_event_from_buffer_protocol_mismatch (buf, buf_size, event);
    break;
  case CCA_Comm_Event_Exec:
    cca_comm_event_from_buffer_exec (buf, buf_size, event);
    break;
  case CCA_Comm_Event_Close:
  case CCA_Comm_Event_Ping:
  case CCA_Comm_Event_Pong:
    cca_comm_event_from_buffer (buf, buf_size, event);
    break;
  }
  
  free (buf);
  
  return buf_size;
}

int
cca_comm_connect_to_server (cca_client_t *client, const char * server, const char * service, cca_connect_params_t * connect)
{
  cca_comm_event_t connect_event;
  int err;
  
  err = cca_open_socket (&client->socket, server, "ladcca");
  if (err)
    {
      fprintf (stderr, "%s: could not create server connection\n", __FUNCTION__);
      return 1;
    }
  
  connect_event.type = CCA_Comm_Event_Connect;
  connect_event.event_data.connect = connect;
  
  CCA_DEBUG ("sending Connect event");
  err = cca_comm_send_event (client->socket, &connect_event);
  if (err == -1)
    {
      fprintf (stderr, "%s: error sending connect event to the server\n", __FUNCTION__);
      close (client->socket);
      return 1;
    }
  CCA_DEBUG ("Connect event sent");
  
  return 0;
}

static void
cca_comm_recv_finish (cca_client_t * client)
{
  cca_list_t * node;

  client->recv_close = 1;
  client->send_close = 1;
  pthread_cond_signal (&client->send_conditional);
  pthread_join (client->send_thread, NULL);

  close (client->socket);
  cca_args_destroy (client->args);
  client->args = NULL;
  free (client->class);
  client->class = NULL;

  pthread_mutex_destroy (&client->comm_events_out_lock);
  pthread_cond_destroy (&client->send_conditional);

  for (node = client->comm_events_out; node; node = cca_list_next (node))
    cca_comm_event_destroy ((cca_comm_event_t *) node->data);
  cca_list_free (client->comm_events_out);
}

static void
cca_comm_recv_lost_server (cca_client_t * client)
{
  cca_event_t * ev;

  ev = cca_event_new_with_type (CCA_Server_Lost);
  pthread_mutex_lock (&client->events_in_lock);
  client->events_in = cca_list_append (client->events_in, ev);
  pthread_mutex_unlock (&client->events_in_lock);

  client->server_connected = 0;
  cca_comm_recv_finish (client);
}

void *
cca_comm_recv_run (void * data)
{
  cca_client_t * client;
  cca_comm_event_t comm_event;
  cca_event_t * event;
  cca_config_t * config;
  int err;
  
  client = (cca_client_t *) data;
  
  while (!client->recv_close)
   {
     err = cca_comm_recv_event (client->socket, &comm_event);
     
     if (err == -1)
       {
         fprintf (stderr, "%s: error recieving event\n", __FUNCTION__);
         continue;
       }
     
     if (err == -2)
       {
         CCA_DEBUG ("server disconnected");
       cca_comm_recv_lost_server (client);
       }
     
     switch (comm_event.type)
     {
     case CCA_Comm_Event_Event:
       event = comm_event.event_data.event;
       
       /* add the event to the event buffer */
       pthread_mutex_lock (&client->events_in_lock);
       client->events_in = cca_list_append (client->events_in, event);
       pthread_mutex_unlock (&client->events_in_lock);
       
       break;

       
     case CCA_Comm_Event_Config:
       config = comm_event.event_data.config;

       CCA_DEBUGARGS ("recieved config with key '%s'", cca_config_get_key (config));
       
       /* add to the configs */
       pthread_mutex_lock (&client->configs_in_lock);
       client->configs_in = cca_list_append (client->configs_in, config);
       pthread_mutex_unlock (&client->configs_in_lock);
       
       break;

     
     case CCA_Comm_Event_Ping: {
       cca_comm_event_t * ev;
       ev = cca_comm_event_new ();
       cca_comm_event_set_type (ev, CCA_Comm_Event_Pong);

       pthread_mutex_lock (&client->comm_events_out_lock);
       client->comm_events_out = cca_list_append (client->comm_events_out, ev);
       pthread_mutex_unlock (&client->comm_events_out_lock);
       pthread_cond_signal (&client->send_conditional);
       break;
     }
     
     case CCA_Comm_Event_Protocol_Mismatch:
       fprintf (stderr,
            "%s: protocol version mismatch!; server is using protocol version %s\n",
                __FUNCTION__,
            cca_protocol_string (comm_event.event_data.number));
       cca_comm_recv_lost_server (client);
       break;
     
     case CCA_Comm_Event_Close:
       cca_comm_recv_finish (client);
       break;
     
     default:
       fprintf (stderr, "%s: recieved unknown event of type %d\n",
                __FUNCTION__, comm_event.type);
       break;
     }
   }
 
  return NULL;
}

void *
cca_comm_send_run (void * data)
{
  cca_client_t * client;
  cca_list_t * list;
  cca_comm_event_t * comm_event;
  int err;
  
  client = (cca_client_t *) data;
  
  while (!client->send_close)
    {
      pthread_mutex_lock (&client->comm_events_out_lock);
      list = client->comm_events_out;
      if (list)
        {
          client->comm_events_out = NULL;
        }
      else
        {
          pthread_cond_wait (&client->send_conditional, &client->comm_events_out_lock);
          list = client->comm_events_out;
          client->comm_events_out = NULL;
        }
      pthread_mutex_unlock (&client->comm_events_out_lock);

      if (client->send_close) break;

      while (list)
        {
          comm_event = (cca_comm_event_t *) list->data;
          
          err = cca_comm_send_event (client->socket, comm_event);
          if (err == -1)
            {
              fprintf (stderr, "%s: error sending client comm event\n",
                       __FUNCTION__);
            }
          
          list = cca_list_remove (list, comm_event);
          
          cca_comm_event_free (comm_event);
          free (comm_event);
        }
      
    }
  
  return NULL;
}

/* EOF */

Generated by  Doxygen 1.6.0   Back to index