[netperf-dev] netperf4 commit notice r93 - branches/glib_migration/src

raj at netperf.org raj at netperf.org
Thu Mar 23 14:09:22 PST 2006


Author: raj
Date: 2006-03-23 14:09:21 -0800 (Thu, 23 Mar 2006)
New Revision: 93

Modified:
   branches/glib_migration/src/netlib.c
   branches/glib_migration/src/netlib.h
   branches/glib_migration/src/netmsg.c
   branches/glib_migration/src/netperf.h
   branches/glib_migration/src/netserver.c
Log:
EGHADS! This actually successfully completes the default tests with 
a netserver using glib event loops for accepting, and spawning a new
netserver process on each control connection.  Of course, the spawned
process does not ever terminate and I think gets stuck in a loop :)
but this is a milestone worthy of a commit.  If I knew how to compile
nettest_bsd.c as a Windows DLL I might be so bold as to try that but
I suppose it would be best to get things cleaner under Linux/Glib first.


Modified: branches/glib_migration/src/netlib.c
===================================================================
--- branches/glib_migration/src/netlib.c	2006-03-22 23:10:48 UTC (rev 92)
+++ branches/glib_migration/src/netlib.c	2006-03-23 22:09:21 UTC (rev 93)
@@ -141,6 +141,7 @@
 extern test_hash_t test_hash[TEST_HASH_BUCKETS];
 extern tset_hash_t test_set_hash[TEST_SET_HASH_BUCKETS];
 
+
 #define HIST  void*
 
 #include "nettest_bsd.h"
@@ -149,6 +150,34 @@
 #define PATH_MAX MAX_PATH
 #endif
 
+/* a kludge until I can better structure the code */
+extern server_hash_t netperf_hash[];
+
+int
+add_server_to_specified_hash(server_hash_t *hash, server_t *new_netperf, gboolean do_hash) {
+
+  int hash_value;
+
+  if (do_hash) {
+    /* at some point this needs to change :) */
+    hash_value = 0;
+  }
+  else {
+    hash_value = 0;
+  }
+
+  /* don't forget to add error checking one day */
+  NETPERF_MUTEX_LOCK(hash[hash_value].hash_lock);
+
+  new_netperf->next = hash[hash_value].server;
+  new_netperf->lock = hash[hash_value].hash_lock;
+  hash[hash_value].server = new_netperf;
+
+  NETPERF_MUTEX_UNLOCK(hash[hash_value].hash_lock);
+
+  return(NPE_SUCCESS);
+}
+
 /* given a filename, return the first path to the file that stats
    successfully - which is either the name already given, or that name
    with NETPERFDIR prepended. */
@@ -1631,7 +1660,69 @@
 {
 }
 
+static gboolean
+allocate_netperf(GIOChannel *source, xmlDocPtr message, gpointer data) {
+  gboolean ret;
+  xmlChar  * from_nid;
+  xmlChar  * my_nid;
+  xmlNodePtr msg;
+  xmlNodePtr cur;
+  server_t *netperf;
+  global_state_t *global_state;
 
+  global_state = data;
+
+  msg = xmlDocGetRootElement(message);
+  if (msg == NULL) {
+    g_fprintf(stderr,"empty document\n");
+    ret = FALSE;
+  } 
+  else {
+    cur = msg->xmlChildrenNode;
+    if (xmlStrcmp(cur->name,(const xmlChar *)"version")!=0) {
+      if (debug) {
+	g_fprintf(where,
+		  "%s: Received an unexpected first message\n", __func__);
+	fflush(where);
+      }
+      ret = FALSE;
+    } 
+    else {
+      /* require the caller to ensure the netperf isn't already around */
+      my_nid   = xmlStrdup(xmlGetProp(msg,(const xmlChar *)"tonid"));
+      from_nid = xmlStrdup(xmlGetProp(msg,(const xmlChar *)"fromnid"));
+
+      if ((netperf = (server_t *)malloc(sizeof(server_t))) == NULL) {
+	g_fprintf(where,"%s: malloc failed\n", __func__);
+	exit(1);
+      }
+      memset(netperf,0,sizeof(server_t));
+      netperf->id        = from_nid;
+      netperf->my_nid    = my_nid;
+#ifdef G_OS_WIN32
+      netperf->sock      = g_io_channel_win32_get_fd(source);
+#else
+      netperf->sock       = g_io_channel_unix_get_fd(source);
+#endif
+      netperf->source    = source;
+      netperf->state     = NSRV_PREINIT;
+      netperf->state_req = NSRV_WORK;
+#ifdef WITH_GLIB
+      netperf->thread_id       = NULL;
+#else
+      netperf->thread_id       = -1;
+#endif
+      netperf->next      = NULL;
+      
+      add_server_to_specified_hash(global_state->server_hash, netperf, FALSE);
+      ret = TRUE;
+    }
+  }
+  return ret;
+}
+
+
+
 /* loop and grab all the available bytes, but no more than N from the
    source and return. add the number of bytes received to the
    bytes_read parameter */
@@ -1677,7 +1768,7 @@
 /* given a buffer with a complete control message, XML parse it and
    then send it on its way. */
 gboolean
-xml_parse_control_message(gchar *message, gsize length, gpointer data) {
+xml_parse_control_message(gchar *message, gsize length, gpointer data, GIOChannel *source) {
   
   xmlDocPtr xml_message;
   int rc = NPE_SUCCESS;
@@ -1705,7 +1796,16 @@
 		message,
 		xml_message);
     }
-    /* lookup its destination and send it on its way */
+    /* was this the first message on the control connection? */
+    if (global_state->first_message) {
+      allocate_netperf(source,xml_message,data);
+      global_state->first_message = FALSE;
+    }
+
+    /* lookup its destination and send it on its way. we are
+       ass-u-me-ing that there is only one netperf to be found, which
+       ultimately may not be correct so don't forget to come back here
+       then... */
     netperf = global_state->server_hash[0].server;
     /* mutex locking is not required because only one netserver thread
        looks at these data structures per sgb */
@@ -1864,7 +1964,8 @@
       /* we have an entire message, time to process it */
       ret = xml_parse_control_message(message_state->buffer,
 				      message_state->bytes_received,
-				      data);
+				      data,
+				      source);
       /* let us not forget to reset our message_state shall we?  we
 	 don't really want to re-parse the same message over and over
 	 again... raj 2006-03-22 */

Modified: branches/glib_migration/src/netlib.h
===================================================================
--- branches/glib_migration/src/netlib.h	2006-03-22 23:10:48 UTC (rev 92)
+++ branches/glib_migration/src/netlib.h	2006-03-23 22:09:21 UTC (rev 93)
@@ -146,6 +146,7 @@
   message_state_t *message_state; /* so we can keep track of partials */
   GMainLoop       *loop;          /* so we can add things to the loop */
   gboolean        is_netserver;   /* not sure if this is really necessary */
+  gboolean        first_message;  /* do we await the first message? */
 } global_state_t;
 
 extern void netlib_init();

Modified: branches/glib_migration/src/netmsg.c
===================================================================
--- branches/glib_migration/src/netmsg.c	2006-03-22 23:10:48 UTC (rev 92)
+++ branches/glib_migration/src/netmsg.c	2006-03-23 22:09:21 UTC (rev 93)
@@ -129,7 +129,7 @@
 };
 
 const struct msgs   NS_Msgs[] = {
-  /* Message name, function,             StateBitMap */
+  /* Message name,     function,                StateBitMap */
 #ifdef OFF
   { "clear",           clear_message,            0x00000000 },
   { "error",           error_message,            0x00000000 },
@@ -328,7 +328,7 @@
     /* versions match */
     netperf->state      =  NSRV_VERS;
     netperf->state_req  =  NSRV_WORK;
-    rc = send_version_message(netperf,my_nid);
+    rc = send_version_message(netperf,netperf->my_nid);
   } else {
     /* versions don't match */
     if (debug) {
@@ -509,7 +509,7 @@
     rc = send_control_message(server->sock,
                               stats,
                               server->id,
-                              my_nid);
+                              server->my_nid);
     if (rc != NPE_SUCCESS) {
       if (debug) {
         fprintf(where,
@@ -544,7 +544,7 @@
     rc = send_control_message(server->sock,
                               sys_stats,
                               server->id,
-                              my_nid);
+                              server->my_nid);
     if (rc != NPE_SUCCESS) {
       if (debug) {
         fprintf(where,

Modified: branches/glib_migration/src/netperf.h
===================================================================
--- branches/glib_migration/src/netperf.h	2006-03-22 23:10:48 UTC (rev 92)
+++ branches/glib_migration/src/netperf.h	2006-03-23 22:09:21 UTC (rev 93)
@@ -216,6 +216,9 @@
   xmlChar          *id;          /* the id of the server instance. used
                                     in searches and as sanity checks  */
 
+  xmlChar          *my_nid;      /* used in netserver, used to be
+				    global */
+
   NETPERF_RWLOCK_T rwlock;       /* the mutex used to ensure exclusive
                                     access to this servers resources */
 
@@ -228,6 +231,9 @@
   SOCKET           sock;         /* the socket over which we communicate
                                     with the server */
 
+  GIOChannel       *source;      /* the control channel over which we
+				    communicate with the server */
+
   ns_state_t       state;        /* in what state is this server
                                     presently? */
 

Modified: branches/glib_migration/src/netserver.c
===================================================================
--- branches/glib_migration/src/netserver.c	2006-03-22 23:10:48 UTC (rev 92)
+++ branches/glib_migration/src/netserver.c	2006-03-23 22:09:21 UTC (rev 93)
@@ -116,7 +116,6 @@
 
 /* gcc does not like intializing where here */
 FILE *where;
-xmlChar *my_nid=NULL;
 
 #define NETPERF_HASH_BUCKETS 1
 
@@ -566,6 +565,7 @@
 {
   int rc=NPE_BAD_VERSION;
   xmlChar  * from_nid;
+  xmlChar  * my_nid;
   xmlDocPtr  message;
   xmlNodePtr msg;
   xmlNodePtr cur;
@@ -606,6 +606,7 @@
           }
           memset(netperf,0,sizeof(server_t));
           netperf->id        = from_nid;
+	  netperf->my_nid    = my_nid;
           netperf->sock      = sock;
           netperf->state     = NSRV_CONNECTED;
           netperf->state_req = NSRV_WORK;
@@ -638,6 +639,7 @@
   } /* end of while */
   return(rc);
 }
+
 
 
 static void
@@ -744,7 +746,7 @@
         }
         test->state = new;
         if (msg) {
-          rc = send_control_message(netperf->sock, msg, netperf->id, my_nid);
+          rc = send_control_message(netperf->sock, msg, netperf->id, netperf->my_nid);
           if (rc != NPE_SUCCESS) {
             if (debug) {
               g_fprintf(where,
@@ -763,6 +765,104 @@
        netserver thread looks at these data structures sgb */
   }
 }
+
+/* the periodic callback function that will check test states and
+   report back any changes.  it looks very much like the original
+   check_test_state() routine - for now anyway. it might be nice to
+   one day get this set such that state change notification was
+   immediate... raj 2006-03-23 */
+
+static void
+check_test_state_callback(gpointer data)
+{
+  int           i;
+  uint32_t      orig;
+  uint32_t      new;
+  int           rc;
+  test_t       *test;
+  test_hash_t  *h;
+  xmlNodePtr    msg = NULL;
+  xmlNodePtr    new_node;
+  xmlChar      *id;
+  server_t     *netperf;
+  char          code[8];
+
+  netperf = netperf_hash[0].server;
+
+  for (i = 0; i < TEST_HASH_BUCKETS; i ++) {
+    h = &test_hash[i];
+    /* mutex locking is not required because only one 
+       netserver thread looks at these data structures sgb */
+    test = h->test;
+    while (test != NULL) {
+      orig = test->state;
+      new  = test->new_state;
+      if (orig != new) {
+        /* report change in test state */
+        if (debug) {
+          g_fprintf(where,"%s:tid = %s  state %d  new_state %d\n",
+                  __func__, test->id, orig, new);
+          fflush(where);
+        }
+        switch (new) {
+        case TEST_INIT:
+          /* what kind of error checking do we want to add ? */
+          msg = xmlNewNode(NULL,(xmlChar *)"initialized");
+          xmlSetProp(msg,(xmlChar *)"tid",test->id);
+          new_node = xmlCopyNode(test->dependent_data,1);
+          xmlAddChild(msg,new_node);
+          break;
+        case TEST_IDLE:
+          msg = xmlNewNode(NULL,(xmlChar *)"idled");
+          xmlSetProp(msg,(xmlChar *)"tid",test->id);
+          break;
+        case TEST_LOADED:
+          msg = xmlNewNode(NULL,(xmlChar *)"loaded");
+          xmlSetProp(msg,(xmlChar *)"tid",test->id);
+          break;
+        case TEST_MEASURE:
+          msg = xmlNewNode(NULL,(xmlChar *)"measuring");
+          xmlSetProp(msg,(xmlChar *)"tid",test->id);
+          break;
+        case TEST_ERROR:
+          msg = xmlNewNode(NULL,(xmlChar *)"error");
+          xmlSetProp(msg,(xmlChar *)"tid",test->id);
+          xmlSetProp(msg,(xmlChar *)"err_fn",(xmlChar *)test->err_fn);
+          xmlSetProp(msg,(xmlChar *)"err_str",(xmlChar *)test->err_str);
+          sprintf(code,"%d",test->err_rc);
+          xmlSetProp(msg,(xmlChar *)"err_rc",(xmlChar *)code);
+          sprintf(code,"%d",test->err_no);
+          xmlSetProp(msg,(xmlChar *)"err_no",(xmlChar *)code);
+          break;
+        case TEST_DEAD:
+          msg = xmlNewNode(NULL,(xmlChar *)"dead");
+          xmlSetProp(msg,(xmlChar *)"tid",test->id);
+          id = test->id;
+          break;
+        default:
+          break;
+        }
+        test->state = new;
+        if (msg) {
+          rc = send_control_message(netperf->sock, msg, netperf->id, netperf->my_nid);
+          if (rc != NPE_SUCCESS) {
+            if (debug) {
+              g_fprintf(where,
+                      "%s: send_control_message failed\n", __func__);
+              fflush(where);
+            }
+          }
+        }
+      }
+      test = test->next;
+      if (new == TEST_DEAD) {
+        delete_test(id);
+      }
+    }
+    /* mutex unlocking is not required because only one 
+       netserver thread looks at these data structures sgb */
+  }
+}
 
 
 
@@ -824,7 +924,7 @@
       loop = 0;
     }
     if (msg) {
-      rc = send_control_message(netperf->sock, msg, netperf->id, my_nid);
+      rc = send_control_message(netperf->sock, msg, netperf->id, netperf->my_nid);
       if (rc != NPE_SUCCESS) {
         if (debug) {
           g_fprintf(where,
@@ -982,9 +1082,7 @@
     CLOSE_SOCKET(control_socket);  /* don't want to leak descriptors */
   }
   else {
-    /* for now, we will simply start processing requests the
-       "original" way. later we will establish a recv callback for a
-       proper glib event loop */
+    /* lets setup the glib event loop... */
     g_fprintf(where,"accepted a connection but told not to spawn\n");
 #ifdef G_OS_WIN32
     control_channel = g_io_channel_win32_new_socket(control_socket);
@@ -1437,11 +1535,12 @@
   global_state_ptr->test_hash     = test_hash;
   global_state_ptr->message_state = g_malloc(sizeof(message_state_t));
   global_state_ptr->is_netserver  = TRUE;
+  global_state_ptr->first_message = TRUE;
   global_state_ptr->loop          = loop;
-  global_state_ptr->message_state->have_header = FALSE;
-  global_state_ptr->message_state->bytes_received = 0;
+  global_state_ptr->message_state->have_header     = FALSE;
+  global_state_ptr->message_state->bytes_received  = 0;
   global_state_ptr->message_state->bytes_remaining = 4;
-  global_state_ptr->message_state->buffer = NULL;
+  global_state_ptr->message_state->buffer          = NULL;
 
   if (need_setup) {
     listen_sock = setup_listen_endpoint(listen_port);
@@ -1488,11 +1587,13 @@
   else {
     /* we used to call handle_netperf_requests(sock); here, now we use
        the loop, luke... */
+
 #ifdef G_OS_WIN32
     control_channel = g_io_channel_win32_new_socket(control_socket);
 #else
     control_channel = g_io_channel_unix_new(control_socket);
 #endif
+
     status = g_io_channel_set_flags(control_channel,G_IO_FLAG_NONBLOCK,&error);
     if (error) {
       g_warning("g_io_channel_set_flags %s %d %s\n",
@@ -1524,6 +1625,9 @@
     g_print("added watch id %d\n",watch_id);
 
     g_print("Starting loop to process stuff...\n");
+    g_timeout_add(1000,
+		  (GSourceFunc)check_test_state_callback,
+		  global_state_ptr);
     g_main_loop_run(loop);
     g_print("Came out of the main loop\n");
   }



More information about the netperf-dev mailing list