[netperf-dev] netperf4 commit notice r97 - branches/glib_migration/src

raj at netperf.org raj at netperf.org
Fri Mar 24 11:54:34 PST 2006


Author: raj
Date: 2006-03-24 11:54:33 -0800 (Fri, 24 Mar 2006)
New Revision: 97

Modified:
   branches/glib_migration/src/netlib.c
   branches/glib_migration/src/netmsg.c
   branches/glib_migration/src/netperf.c
   branches/glib_migration/src/netserver.c
Log:
Use select() instead of poll() since there is select() under Windows
but not poll(), and we would like to get things going on Windows without
having to to a complete migration of the netperf side of things to glib


Modified: branches/glib_migration/src/netlib.c
===================================================================
--- branches/glib_migration/src/netlib.c	2006-03-24 19:37:49 UTC (rev 96)
+++ branches/glib_migration/src/netlib.c	2006-03-24 19:54:33 UTC (rev 97)
@@ -40,6 +40,10 @@
 #include <stdio.h>
 #endif
 
+#ifdef HAVE_SYS_SELECT_H
+#include <sys/select.h>
+#endif
+
 #ifdef HAVE_SYS_TYPES_H
 #include <sys/types.h>
 #endif
@@ -91,10 +95,6 @@
 #endif
 #endif
 
-#ifdef HAVE_POLL_H
-#include <poll.h>
-#endif
-
 #ifdef HAVE_SYS_RESOURCE_H
 #include <sys/resource.h>
 #endif
@@ -2103,11 +2103,19 @@
   char *read_ptr = NULL;
   char *message_base = NULL;
 
-  struct pollfd fds;
+  fd_set read_fds;
+  fd_set error_fds;
+  struct timeval timeout;
 
-  int timeout = 15000;
+  timeout.tv_sec = 15;
+  timeout.tv_usec = 0;
 
+  FD_ZERO(&read_fds);
+  FD_ZERO(&error_fds);
 
+  FD_SET(control_sock,&read_fds);
+  FD_SET(control_sock,&error_fds);
+
   /* one of these days, we probably aught to make sure that what
      message points to is NULL... but only as a debug assert... raj
      2003-03-05 */
@@ -2121,20 +2129,31 @@
   bytes_left = sizeof(uint32_t);
   read_ptr = (char *)&message_len;
   while (bytes_left > 0) {
-    /* precede every recv with a poll() call, so there can be little
+    /* precede every recv with a select() call, so there can be little
        chance of our ever getting hung-up in this routine. raj
        2003-02-26 */
-    fds.fd = control_sock;
-    fds.events = POLLIN;
-    fds.revents = 0;
 
-    /* poll had better return one, or there was either a problem or
+    /* since some selects update the timeout value, we best set it each time */
+    timeout.tv_sec = 15;
+    timeout.tv_usec = 0;
+    
+    FD_ZERO(&read_fds);
+    FD_ZERO(&error_fds);
+    
+    FD_SET(control_sock,&read_fds);
+    FD_SET(control_sock,&error_fds);
+
+    /* select had better return one, or there was either a problem or
        a timeout... */
 
-    if ((counter = poll(&fds, 1, timeout)) != 1) {
+    if ((counter = select(control_sock + 1,
+			  &read_fds, 
+			  NULL, 
+			  &error_fds, 
+			  &timeout)) != 1) {
       if (debug) {
         fprintf(where,
-                "recv_control_message: poll error or timeout. errno %d counter %d\n",
+                "recv_control_message: select error or timeout. errno %d counter %d\n",
                 errno,
                 counter);
         fflush(where);
@@ -2185,19 +2204,32 @@
   read_ptr = message_base;
 
   while (bytes_left > 0) {
-    /* precede every recv with a poll() call, so there can be little
-       chance of our ever getting hung-up in this routine. raj 2003-02-26 */
-    fds.fd = control_sock;
-    fds.events = POLLIN;
-    fds.revents = 0;
+    /* precede every recv with a select() call, so there can be little
+       chance of our ever getting hung-up in this routine. eventually,
+       we will migrate all this stuff over to the glib IO and event
+       loop constructs. raj 2003-02-26 */
 
-    /* poll had better return one, or there was either a problem or
+    /* since some selects update the timeout value, we best set it each time */
+    timeout.tv_sec = 15;
+    timeout.tv_usec = 0;
+    
+    FD_ZERO(&read_fds);
+    FD_ZERO(&error_fds);
+    
+    FD_SET(control_sock,&read_fds);
+    FD_SET(control_sock,&error_fds);
+
+    /* select had better return one, or there was either a problem or
        a timeout... */
 
-    if ((counter = poll(&fds, 1, 15000)) != 1) {
+    if ((counter = select(control_sock + 1,
+			  &read_fds,
+			  NULL,
+			  &error_fds,
+			  &timeout)) != 1) {
       if (debug) {
         fprintf(where,
-                "recv_control_message: poll error or timeout. errno %d counter %d\n",
+                "recv_control_message: select error or timeout on message body. errno %d counter %d\n",
                 errno,
                 counter);
         fflush(where);

Modified: branches/glib_migration/src/netmsg.c
===================================================================
--- branches/glib_migration/src/netmsg.c	2006-03-24 19:37:49 UTC (rev 96)
+++ branches/glib_migration/src/netmsg.c	2006-03-24 19:54:33 UTC (rev 97)
@@ -52,10 +52,6 @@
 #include <netinet/in.h>
 #endif
 
-#ifdef HAVE_POLL_H
-#include <poll.h>
-#endif
-
 #ifdef HAVE_SYS_TIME_H
 /* seems that Darwin or at least MacOS X 4.3 needs sys/time with
    sys/resource */

Modified: branches/glib_migration/src/netperf.c
===================================================================
--- branches/glib_migration/src/netperf.c	2006-03-24 19:37:49 UTC (rev 96)
+++ branches/glib_migration/src/netperf.c	2006-03-24 19:54:33 UTC (rev 97)
@@ -57,6 +57,13 @@
 #include <sys/types.h>
 #endif
 
+#ifdef HAVE_SYS_TIME_H
+#include <sys/time.h>
+#ifdef TIME_WITH_SYS_TIME
+#include <time.h>
+#endif
+#endif
+
 #ifdef HAVE_SYS_STAT_H
 #include <sys/stat.h>
 #endif
@@ -85,8 +92,8 @@
 #include <netinet/in.h>
 #endif
 
-#ifdef HAVE_POLL_H
-#include <poll.h>
+#ifdef HAVE_SYS_SELECT_H
+#include <sys/select.h>
 #endif
 
 #ifdef HAVE_UNISTD_H
@@ -906,7 +913,10 @@
 wait_for_version_response(server_t *server)
 {
   int        rc = NPE_SUCCESS;
-  struct     pollfd fds;
+  fd_set     read_fds;
+  fd_set     error_fds;
+  struct timeval timeout;
+
   xmlDocPtr  message;
 
   if (debug) {
@@ -915,11 +925,19 @@
   }
   NETPERF_MUTEX_LOCK(server->lock);
   while (server->state == NSRV_VERS) {
-    fds.fd      = server->sock;
-    fds.events  = POLLIN;
-    fds.revents = 0;
+    timeout.tv_sec = 5;
+    timeout.tv_usec = 0;
+    FD_ZERO(&read_fds);
+    FD_ZERO(&error_fds);
+    FD_SET(server->sock,&read_fds);
+    FD_SET(server->sock,&error_fds);
+
     NETPERF_MUTEX_UNLOCK(server->lock);
-    if (poll(&fds,1,5000) > 0) {
+    if (select(server->sock + 1,
+	       &read_fds,
+	       NULL,
+	       &error_fds,
+	       &timeout) > 0) {
       if (debug) {
         fprintf(where,"wait_for_version_response ");
         fprintf(where,"calling recv_control_message\n");
@@ -1213,7 +1231,10 @@
 {
   int rc;
   server_t     *server = data;
-  struct pollfd fds;
+  fd_set read_fds;
+  fd_set error_fds;
+  struct timeval timeout;
+
   xmlDocPtr     message;
   
   rc = wait_for_version_response(server);
@@ -1223,11 +1244,19 @@
   NETPERF_MUTEX_LOCK(server->lock);
 
   while (server->state != NSRV_ERROR) {
-    fds.fd      = server->sock;
-    fds.events  = POLLIN;
-    fds.revents = 0;
+    FD_ZERO(&read_fds);
+    FD_ZERO(&error_fds);
+    FD_SET(server->sock,&read_fds);
+    FD_SET(server->sock,&error_fds);
+    timeout.tv_sec = 5;
+    timeout.tv_usec = 0;
+
     NETPERF_MUTEX_UNLOCK(server->lock);
-    if (poll(&fds,1,5000) > 0) {
+    if (select(server->sock + 1,
+	       &read_fds,
+	       NULL,
+	       &error_fds,
+	       &timeout) > 0) {
       rc = recv_control_message(server->sock, &message);
       if (rc > 0) {
         rc = process_message(server, message);

Modified: branches/glib_migration/src/netserver.c
===================================================================
--- branches/glib_migration/src/netserver.c	2006-03-24 19:37:49 UTC (rev 96)
+++ branches/glib_migration/src/netserver.c	2006-03-24 19:54:33 UTC (rev 97)
@@ -85,10 +85,6 @@
 #include <netinet/in.h>
 #endif
 
-#ifdef HAVE_POLL_H
-#include <poll.h>
-#endif
-
 #ifdef HAVE_LIMITS_H
 #include <limits.h>
 #endif
@@ -539,86 +535,6 @@
 
 
 
-static int
-instantiate_netperf( int sock )
-{
-  int rc=NPE_BAD_VERSION;
-  xmlChar  * from_nid;
-  xmlChar  * my_nid;
-  xmlDocPtr  message;
-  xmlNodePtr msg;
-  xmlNodePtr cur;
-  server_t * netperf;
-
-  NETPERF_DEBUG_ENTRY(debug,where);
-
-  while (rc != NPE_SUCCESS) {
-    rc = recv_control_message(sock,&message);
-    if (rc > 0) { /* received a message */
-      if (debug) {
-        g_fprintf(where, "Received a control message to doc %p\n", message);
-        fflush(where);
-      }
-      msg = xmlDocGetRootElement(message);
-      if (msg == NULL) {
-        g_fprintf(stderr,"empty document\n");
-        xmlFreeDoc(message);
-        rc = NPE_EMPTY_MSG;
-      } else {
-        cur = msg->xmlChildrenNode;
-        if (xmlStrcmp(cur->name,(const xmlChar *)"version")!=0) {
-          if (debug) {
-            g_fprintf(where,
-                    "%s: Received an unexpected message\n", __func__);
-            fflush(where);
-          }
-          rc = NPE_UNEXPECTED_MSG;
-        } else {
-          /* require the caller to ensure the netperf isn't already around */
-          my_nid   = xmlStrdup(xmlGetProp(msg,(const xmlChar *)"tonid"));
-          from_nid = xmlStrdup(xmlGetProp(msg,(const xmlChar *)"fromnid"));
-
-          if ((netperf = (server_t *)malloc(sizeof(server_t))) == NULL) {
-            g_fprintf(where,"%s: malloc failed\n", __func__);
-            fflush(where);
-            exit(1);
-          }
-          memset(netperf,0,sizeof(server_t));
-          netperf->id        = from_nid;
-	  netperf->my_nid    = my_nid;
-          netperf->sock      = sock;
-          netperf->state     = NSRV_CONNECTED;
-          netperf->state_req = NSRV_WORK;
-#ifdef WITH_GLIB
-	  netperf->thread_id       = NULL;
-#else
-          netperf->thread_id       = -1;
-#endif
-          netperf->next      = NULL;
-
-          /* check the version */
-          rc = ns_version_check(cur,message,netperf);
-        }
-      }
-    } 
-    else {
-      if (debug) {
-        g_fprintf(where,"%s: close connection remote close\n", __func__);
-        fflush(where);
-      }
-      CLOSE_SOCKET(sock);
-      exit(-1);
-    }
-    if (rc == NPE_SUCCESS) {
-      add_netperf_to_hash(netperf);
-    } else {
-      free(netperf);
-    }
-    xmlFreeDoc(message);
-  } /* end of while */
-  return(rc);
-}
-
 
 
 static void
@@ -653,98 +569,6 @@
   }
 }
 
-static void
-check_test_state()
-{
-  int           i;
-  uint32_t      orig;
-  uint32_t      new;
-  int           rc;
-  test_t       *test;
-  test_hash_t  *h;
-  xmlNodePtr    msg = NULL;
-  xmlNodePtr    new_node;
-  xmlChar      *id;
-  server_t     *netperf;
-  char          code[8];
-
-  netperf = netperf_hash[0].server;
-
-  for (i = 0; i < TEST_HASH_BUCKETS; i ++) {
-    h = &test_hash[i];
-    /* mutex locking is not required because only one 
-       netserver thread looks at these data structures sgb */
-    test = h->test;
-    while (test != NULL) {
-      orig = test->state;
-      new  = test->new_state;
-      if (orig != new) {
-        /* report change in test state */
-        if (debug) {
-          g_fprintf(where,"%s:tid = %s  state %d  new_state %d\n",
-                  __func__, test->id, orig, new);
-          fflush(where);
-        }
-        switch (new) {
-        case TEST_INIT:
-          /* what kind of error checking do we want to add ? */
-          msg = xmlNewNode(NULL,(xmlChar *)"initialized");
-          xmlSetProp(msg,(xmlChar *)"tid",test->id);
-          new_node = xmlCopyNode(test->dependent_data,1);
-          xmlAddChild(msg,new_node);
-          break;
-        case TEST_IDLE:
-          msg = xmlNewNode(NULL,(xmlChar *)"idled");
-          xmlSetProp(msg,(xmlChar *)"tid",test->id);
-          break;
-        case TEST_LOADED:
-          msg = xmlNewNode(NULL,(xmlChar *)"loaded");
-          xmlSetProp(msg,(xmlChar *)"tid",test->id);
-          break;
-        case TEST_MEASURE:
-          msg = xmlNewNode(NULL,(xmlChar *)"measuring");
-          xmlSetProp(msg,(xmlChar *)"tid",test->id);
-          break;
-        case TEST_ERROR:
-          msg = xmlNewNode(NULL,(xmlChar *)"error");
-          xmlSetProp(msg,(xmlChar *)"tid",test->id);
-          xmlSetProp(msg,(xmlChar *)"err_fn",(xmlChar *)test->err_fn);
-          xmlSetProp(msg,(xmlChar *)"err_str",(xmlChar *)test->err_str);
-          sprintf(code,"%d",test->err_rc);
-          xmlSetProp(msg,(xmlChar *)"err_rc",(xmlChar *)code);
-          sprintf(code,"%d",test->err_no);
-          xmlSetProp(msg,(xmlChar *)"err_no",(xmlChar *)code);
-          break;
-        case TEST_DEAD:
-          msg = xmlNewNode(NULL,(xmlChar *)"dead");
-          xmlSetProp(msg,(xmlChar *)"tid",test->id);
-          id = test->id;
-          break;
-        default:
-          break;
-        }
-        test->state = new;
-        if (msg) {
-          rc = send_control_message(netperf->sock, msg, netperf->id, netperf->my_nid);
-          if (rc != NPE_SUCCESS) {
-            if (debug) {
-              g_fprintf(where,
-                      "%s: send_control_message failed\n", __func__);
-              fflush(where);
-            }
-          }
-        }
-      }
-      test = test->next;
-      if (new == TEST_DEAD) {
-        delete_test(id);
-      }
-    }
-    /* mutex unlocking is not required because only one 
-       netserver thread looks at these data structures sgb */
-  }
-}
-
 /* the periodic callback function that will check test states and
    report back any changes.  it looks very much like the original
    check_test_state() routine - for now anyway. it might be nice to
@@ -842,178 +666,9 @@
        netserver thread looks at these data structures sgb */
   }
 }
-
 
-
-static void
-kill_all_tests()
-{
-  int           i;
-  int           empty_hash_buckets;
-  test_t       *test;
-  test_hash_t  *h;
-
-
-  for (i = 0; i < TEST_HASH_BUCKETS; i ++) {
-    h = &test_hash[i];
-    /* mutex locking is not required because only one 
-       netserver thread looks at these data structures sgb */
-    test = h->test;
-    while (test != NULL) {
-      /* tell each test to die */
-      test->state_req = TEST_DEAD;
-      test = test->next;
-    }
-  }
-  empty_hash_buckets = 0;
-  while(empty_hash_buckets < TEST_HASH_BUCKETS) {
-    empty_hash_buckets = 0;
-    g_usleep(1000000);
-    check_test_state();
-    for (i = 0; i < TEST_HASH_BUCKETS; i ++) {
-      if (test_hash[i].test == NULL) {
-        empty_hash_buckets++;
-      }
-    }
-  }
-}
 
 
-
-static int
-close_netserver()
-{
-  int rc;
-  int loop;
-  server_t     *netperf;
-
-
-  netperf = netperf_hash[0].server;
-  if ((netperf->state_req == NSRV_CLOSE) ||
-      (netperf->state_req == NSRV_EXIT ) ||
-      (netperf->err_rc == NPE_REMOTE_CLOSE)) {
-    xmlNodePtr    msg = NULL;
-    kill_all_tests();
-    msg = xmlNewNode(NULL,(xmlChar *)"closed");
-    if (netperf->state_req == NSRV_CLOSE) {
-      xmlSetProp(msg,(xmlChar *)"flag",(const xmlChar *)"LOOPING");
-      loop = 1;
-    }
-    if (netperf->state_req == NSRV_EXIT) {
-      xmlSetProp(msg,(xmlChar *)"flag",(const xmlChar *)"GONE");
-      loop = 0;
-    }
-    if (msg) {
-      rc = send_control_message(netperf->sock, msg, netperf->id, netperf->my_nid);
-      if (rc != NPE_SUCCESS) {
-        if (debug) {
-          g_fprintf(where,
-                  "%s: send_control_message failed\n", __func__);
-          fflush(where);
-        }
-      }
-    }
-    delete_netperf(netperf->id);
-  } else {
-    /* we should never really get here   sgb  2005-12-06 */
-    g_fprintf(where, "%s entered through some unknown path!!!!\n", __func__);
-    g_fprintf(where, "netperf->state_req = %d \t netperf->err_rc = %d\n",
-            netperf->state_req, netperf->err_rc);
-    fflush(where);
-    exit(-2);
-  }
-  return(loop);
-}
-
-
-static int
-handle_netperf_requests(int sock)
-{
-  int rc = NPE_SUCCESS;
-  struct pollfd fds;
-  xmlDocPtr     message;
-  server_t     *netperf;
-
-  NETPERF_DEBUG_ENTRY(debug,where);
-
-  rc = instantiate_netperf(sock);
-  if (rc != NPE_SUCCESS) {
-    g_fprintf(where,
-            "%s %s: instantiate_netperf  error %d\n",
-            program_name,
-            __func__,
-            rc);
-    fflush(where);
-    exit(rc);
-  }
-  netperf = netperf_hash[0].server;
-  /* mutex locking is not required because only one 
-     netserver thread looks at these data structures sgb */
-  while(netperf->state != NSRV_ERROR) {
-
-    /* check the state of all tests */
-    check_test_state();
-
-    fds.fd      = netperf->sock;
-    fds.events  = POLLIN;
-    fds.revents = 0;
-    /* mutex unlocking is not required because only one 
-       netserver thread looks at these data structures sgb */
-    if (poll(&fds,1,5000) > 0) {
-      /* mutex locking is not required because only one 
-         netserver thread looks at these data structures sgb */
-      rc = recv_control_message(netperf->sock, &message);
-      /* mutex unlocking is not required because only one 
-         netserver thread looks at these data structures sgb */
-      if (rc > 0) {
-        rc = process_message(netperf, message);
-        if (rc) {
-          g_fprintf(where,"process_message returned %d  %s\n",
-                  rc, netperf_error_name(rc));
-          fflush(where);
-        }
-      } else {
-        netperf->state = NSRV_ERROR;
-        netperf->err_fn = (char *)__func__;
-        if (rc == 0) {
-          netperf->err_rc = NPE_REMOTE_CLOSE;
-        } else {
-          g_fprintf(where,"recv_control_message returned %d  %s\n",
-                  rc, netperf_error_name(rc));
-          fflush(where);
-          netperf->err_rc = rc;
-        }
-      }
-    } else {
-      if (debug) {
-        g_fprintf(where,"ho hum, nothing to do\n");
-        fflush(where);
-        report_servers_test_status(netperf);
-      }
-      report_stuck_test_status(netperf);
-    }
-    /* mutex locking is not required because only one 
-       netserver thread looks at these data structures sgb */
-    if (rc != NPE_SUCCESS) {
-      netperf->state  = NSRV_ERROR;
-      netperf->err_rc = rc;
-      netperf->err_fn = (char *)__func__;
-    }
-    if ((netperf->state_req == NSRV_CLOSE) ||
-        (netperf->state_req == NSRV_EXIT ))  {
-      break;
-    }
-  }
-
-  if (rc != NPE_SUCCESS) {
-    report_server_error(netperf);
-  }
-  /* mutex unlocking is not required because only one 
-     netserver thread looks at these data structures sgb */
-  return(close_netserver());
-}
-
-
 /* called when it is time to accept a new control connection off the
    listen endpoint */
 gboolean  accept_connection(GIOChannel *source,
@@ -1088,7 +743,7 @@
       g_clear_error(&error);
     }
     
-    g_fprintf(where,"status after set_encoding %d\n");
+    g_fprintf(where,"status after set_encoding %d\n",status);
     
     g_io_channel_set_buffered(control_channel,FALSE);
     
@@ -1107,13 +762,9 @@
 setup_listen_endpoint(char service[]) {
 
   struct sockaddr   name;
-  struct sockaddr  *peeraddr     = &name;
   int               namelen      = sizeof(name);
   netperf_socklen_t peerlen      = namelen;
-  SOCKET            sock;
   SOCKET            listenfd     = 0;
-  int               loop         = 1;
-  char              filename[PATH_MAX];
 
   NETPERF_DEBUG_ENTRY(debug,where);
 
@@ -1238,7 +889,6 @@
 netserver_init()
 {
   int   i;
-  int   rc;
 
   NETPERF_DEBUG_ENTRY(debug,where);
 
@@ -1548,7 +1198,7 @@
       g_clear_error(&error);
     }
     
-    g_fprintf(where,"status after set_encoding %d\n");
+    g_fprintf(where,"status after set_encoding %d\n",status);
     
     g_io_channel_set_buffered(control_channel,FALSE);
     
@@ -1593,7 +1243,7 @@
       g_clear_error(&error);
     }
     
-    g_fprintf(where,"status after set_encoding %d\n");
+    g_fprintf(where,"status after set_encoding %d\n",status);
     
     g_io_channel_set_buffered(control_channel,FALSE);
     
@@ -1611,5 +1261,6 @@
     g_main_loop_run(loop);
     g_fprintf(where,"Came out of the main loop\n");
   }
+  return(0);
 }
 



More information about the netperf-dev mailing list