[netperf-dev] netperf4 commit notice r93 -
branches/glib_migration/src
raj at netperf.org
raj at netperf.org
Thu Mar 23 14:09:22 PST 2006
Author: raj
Date: 2006-03-23 14:09:21 -0800 (Thu, 23 Mar 2006)
New Revision: 93
Modified:
branches/glib_migration/src/netlib.c
branches/glib_migration/src/netlib.h
branches/glib_migration/src/netmsg.c
branches/glib_migration/src/netperf.h
branches/glib_migration/src/netserver.c
Log:
EGHADS! This actually successfully completes the default tests with
a netserver using glib event loops for accepting, and spawning a new
netserver process on each control connection. Of course, the spawned
process does not ever terminate and I think gets stuck in a loop :)
but this is a milestone worthy of a commit. If I knew how to compile
nettest_bsd.c as a Windows DLL I might be so bold as to try that but
I suppose it would be best to get things cleaner under Linux/Glib first.
Modified: branches/glib_migration/src/netlib.c
===================================================================
--- branches/glib_migration/src/netlib.c 2006-03-22 23:10:48 UTC (rev 92)
+++ branches/glib_migration/src/netlib.c 2006-03-23 22:09:21 UTC (rev 93)
@@ -141,6 +141,7 @@
extern test_hash_t test_hash[TEST_HASH_BUCKETS];
extern tset_hash_t test_set_hash[TEST_SET_HASH_BUCKETS];
+
#define HIST void*
#include "nettest_bsd.h"
@@ -149,6 +150,34 @@
#define PATH_MAX MAX_PATH
#endif
+/* a kludge until I can better structure the code */
+extern server_hash_t netperf_hash[];
+
+int
+add_server_to_specified_hash(server_hash_t *hash, server_t *new_netperf, gboolean do_hash) {
+
+ int hash_value;
+
+ if (do_hash) {
+ /* at some point this needs to change :) */
+ hash_value = 0;
+ }
+ else {
+ hash_value = 0;
+ }
+
+ /* don't forget to add error checking one day */
+ NETPERF_MUTEX_LOCK(hash[hash_value].hash_lock);
+
+ new_netperf->next = hash[hash_value].server;
+ new_netperf->lock = hash[hash_value].hash_lock;
+ hash[hash_value].server = new_netperf;
+
+ NETPERF_MUTEX_UNLOCK(hash[hash_value].hash_lock);
+
+ return(NPE_SUCCESS);
+}
+
/* given a filename, return the first path to the file that stats
successfully - which is either the name already given, or that name
with NETPERFDIR prepended. */
@@ -1631,7 +1660,69 @@
{
}
+static gboolean
+allocate_netperf(GIOChannel *source, xmlDocPtr message, gpointer data) {
+ gboolean ret;
+ xmlChar * from_nid;
+ xmlChar * my_nid;
+ xmlNodePtr msg;
+ xmlNodePtr cur;
+ server_t *netperf;
+ global_state_t *global_state;
+ global_state = data;
+
+ msg = xmlDocGetRootElement(message);
+ if (msg == NULL) {
+ g_fprintf(stderr,"empty document\n");
+ ret = FALSE;
+ }
+ else {
+ cur = msg->xmlChildrenNode;
+ if (xmlStrcmp(cur->name,(const xmlChar *)"version")!=0) {
+ if (debug) {
+ g_fprintf(where,
+ "%s: Received an unexpected first message\n", __func__);
+ fflush(where);
+ }
+ ret = FALSE;
+ }
+ else {
+ /* require the caller to ensure the netperf isn't already around */
+ my_nid = xmlStrdup(xmlGetProp(msg,(const xmlChar *)"tonid"));
+ from_nid = xmlStrdup(xmlGetProp(msg,(const xmlChar *)"fromnid"));
+
+ if ((netperf = (server_t *)malloc(sizeof(server_t))) == NULL) {
+ g_fprintf(where,"%s: malloc failed\n", __func__);
+ exit(1);
+ }
+ memset(netperf,0,sizeof(server_t));
+ netperf->id = from_nid;
+ netperf->my_nid = my_nid;
+#ifdef G_OS_WIN32
+ netperf->sock = g_io_channel_win32_get_fd(source);
+#else
+ netperf->sock = g_io_channel_unix_get_fd(source);
+#endif
+ netperf->source = source;
+ netperf->state = NSRV_PREINIT;
+ netperf->state_req = NSRV_WORK;
+#ifdef WITH_GLIB
+ netperf->thread_id = NULL;
+#else
+ netperf->thread_id = -1;
+#endif
+ netperf->next = NULL;
+
+ add_server_to_specified_hash(global_state->server_hash, netperf, FALSE);
+ ret = TRUE;
+ }
+ }
+ return ret;
+}
+
+
+
/* loop and grab all the available bytes, but no more than N from the
source and return. add the number of bytes received to the
bytes_read parameter */
@@ -1677,7 +1768,7 @@
/* given a buffer with a complete control message, XML parse it and
then send it on its way. */
gboolean
-xml_parse_control_message(gchar *message, gsize length, gpointer data) {
+xml_parse_control_message(gchar *message, gsize length, gpointer data, GIOChannel *source) {
xmlDocPtr xml_message;
int rc = NPE_SUCCESS;
@@ -1705,7 +1796,16 @@
message,
xml_message);
}
- /* lookup its destination and send it on its way */
+ /* was this the first message on the control connection? */
+ if (global_state->first_message) {
+ allocate_netperf(source,xml_message,data);
+ global_state->first_message = FALSE;
+ }
+
+ /* lookup its destination and send it on its way. we are
+ ass-u-me-ing that there is only one netperf to be found, which
+ ultimately may not be correct so don't forget to come back here
+ then... */
netperf = global_state->server_hash[0].server;
/* mutex locking is not required because only one netserver thread
looks at these data structures per sgb */
@@ -1864,7 +1964,8 @@
/* we have an entire message, time to process it */
ret = xml_parse_control_message(message_state->buffer,
message_state->bytes_received,
- data);
+ data,
+ source);
/* let us not forget to reset our message_state shall we? we
don't really want to re-parse the same message over and over
again... raj 2006-03-22 */
Modified: branches/glib_migration/src/netlib.h
===================================================================
--- branches/glib_migration/src/netlib.h 2006-03-22 23:10:48 UTC (rev 92)
+++ branches/glib_migration/src/netlib.h 2006-03-23 22:09:21 UTC (rev 93)
@@ -146,6 +146,7 @@
message_state_t *message_state; /* so we can keep track of partials */
GMainLoop *loop; /* so we can add things to the loop */
gboolean is_netserver; /* not sure if this is really necessary */
+ gboolean first_message; /* do we await the first message? */
} global_state_t;
extern void netlib_init();
Modified: branches/glib_migration/src/netmsg.c
===================================================================
--- branches/glib_migration/src/netmsg.c 2006-03-22 23:10:48 UTC (rev 92)
+++ branches/glib_migration/src/netmsg.c 2006-03-23 22:09:21 UTC (rev 93)
@@ -129,7 +129,7 @@
};
const struct msgs NS_Msgs[] = {
- /* Message name, function, StateBitMap */
+ /* Message name, function, StateBitMap */
#ifdef OFF
{ "clear", clear_message, 0x00000000 },
{ "error", error_message, 0x00000000 },
@@ -328,7 +328,7 @@
/* versions match */
netperf->state = NSRV_VERS;
netperf->state_req = NSRV_WORK;
- rc = send_version_message(netperf,my_nid);
+ rc = send_version_message(netperf,netperf->my_nid);
} else {
/* versions don't match */
if (debug) {
@@ -509,7 +509,7 @@
rc = send_control_message(server->sock,
stats,
server->id,
- my_nid);
+ server->my_nid);
if (rc != NPE_SUCCESS) {
if (debug) {
fprintf(where,
@@ -544,7 +544,7 @@
rc = send_control_message(server->sock,
sys_stats,
server->id,
- my_nid);
+ server->my_nid);
if (rc != NPE_SUCCESS) {
if (debug) {
fprintf(where,
Modified: branches/glib_migration/src/netperf.h
===================================================================
--- branches/glib_migration/src/netperf.h 2006-03-22 23:10:48 UTC (rev 92)
+++ branches/glib_migration/src/netperf.h 2006-03-23 22:09:21 UTC (rev 93)
@@ -216,6 +216,9 @@
xmlChar *id; /* the id of the server instance. used
in searches and as sanity checks */
+ xmlChar *my_nid; /* used in netserver, used to be
+ global */
+
NETPERF_RWLOCK_T rwlock; /* the mutex used to ensure exclusive
access to this servers resources */
@@ -228,6 +231,9 @@
SOCKET sock; /* the socket over which we communicate
with the server */
+ GIOChannel *source; /* the control channel over which we
+ communicate with the server */
+
ns_state_t state; /* in what state is this server
presently? */
Modified: branches/glib_migration/src/netserver.c
===================================================================
--- branches/glib_migration/src/netserver.c 2006-03-22 23:10:48 UTC (rev 92)
+++ branches/glib_migration/src/netserver.c 2006-03-23 22:09:21 UTC (rev 93)
@@ -116,7 +116,6 @@
/* gcc does not like intializing where here */
FILE *where;
-xmlChar *my_nid=NULL;
#define NETPERF_HASH_BUCKETS 1
@@ -566,6 +565,7 @@
{
int rc=NPE_BAD_VERSION;
xmlChar * from_nid;
+ xmlChar * my_nid;
xmlDocPtr message;
xmlNodePtr msg;
xmlNodePtr cur;
@@ -606,6 +606,7 @@
}
memset(netperf,0,sizeof(server_t));
netperf->id = from_nid;
+ netperf->my_nid = my_nid;
netperf->sock = sock;
netperf->state = NSRV_CONNECTED;
netperf->state_req = NSRV_WORK;
@@ -638,6 +639,7 @@
} /* end of while */
return(rc);
}
+
static void
@@ -744,7 +746,7 @@
}
test->state = new;
if (msg) {
- rc = send_control_message(netperf->sock, msg, netperf->id, my_nid);
+ rc = send_control_message(netperf->sock, msg, netperf->id, netperf->my_nid);
if (rc != NPE_SUCCESS) {
if (debug) {
g_fprintf(where,
@@ -763,6 +765,104 @@
netserver thread looks at these data structures sgb */
}
}
+
+/* the periodic callback function that will check test states and
+ report back any changes. it looks very much like the original
+ check_test_state() routine - for now anyway. it might be nice to
+ one day get this set such that state change notification was
+ immediate... raj 2006-03-23 */
+
+static void
+check_test_state_callback(gpointer data)
+{
+ int i;
+ uint32_t orig;
+ uint32_t new;
+ int rc;
+ test_t *test;
+ test_hash_t *h;
+ xmlNodePtr msg = NULL;
+ xmlNodePtr new_node;
+ xmlChar *id;
+ server_t *netperf;
+ char code[8];
+
+ netperf = netperf_hash[0].server;
+
+ for (i = 0; i < TEST_HASH_BUCKETS; i ++) {
+ h = &test_hash[i];
+ /* mutex locking is not required because only one
+ netserver thread looks at these data structures sgb */
+ test = h->test;
+ while (test != NULL) {
+ orig = test->state;
+ new = test->new_state;
+ if (orig != new) {
+ /* report change in test state */
+ if (debug) {
+ g_fprintf(where,"%s:tid = %s state %d new_state %d\n",
+ __func__, test->id, orig, new);
+ fflush(where);
+ }
+ switch (new) {
+ case TEST_INIT:
+ /* what kind of error checking do we want to add ? */
+ msg = xmlNewNode(NULL,(xmlChar *)"initialized");
+ xmlSetProp(msg,(xmlChar *)"tid",test->id);
+ new_node = xmlCopyNode(test->dependent_data,1);
+ xmlAddChild(msg,new_node);
+ break;
+ case TEST_IDLE:
+ msg = xmlNewNode(NULL,(xmlChar *)"idled");
+ xmlSetProp(msg,(xmlChar *)"tid",test->id);
+ break;
+ case TEST_LOADED:
+ msg = xmlNewNode(NULL,(xmlChar *)"loaded");
+ xmlSetProp(msg,(xmlChar *)"tid",test->id);
+ break;
+ case TEST_MEASURE:
+ msg = xmlNewNode(NULL,(xmlChar *)"measuring");
+ xmlSetProp(msg,(xmlChar *)"tid",test->id);
+ break;
+ case TEST_ERROR:
+ msg = xmlNewNode(NULL,(xmlChar *)"error");
+ xmlSetProp(msg,(xmlChar *)"tid",test->id);
+ xmlSetProp(msg,(xmlChar *)"err_fn",(xmlChar *)test->err_fn);
+ xmlSetProp(msg,(xmlChar *)"err_str",(xmlChar *)test->err_str);
+ sprintf(code,"%d",test->err_rc);
+ xmlSetProp(msg,(xmlChar *)"err_rc",(xmlChar *)code);
+ sprintf(code,"%d",test->err_no);
+ xmlSetProp(msg,(xmlChar *)"err_no",(xmlChar *)code);
+ break;
+ case TEST_DEAD:
+ msg = xmlNewNode(NULL,(xmlChar *)"dead");
+ xmlSetProp(msg,(xmlChar *)"tid",test->id);
+ id = test->id;
+ break;
+ default:
+ break;
+ }
+ test->state = new;
+ if (msg) {
+ rc = send_control_message(netperf->sock, msg, netperf->id, netperf->my_nid);
+ if (rc != NPE_SUCCESS) {
+ if (debug) {
+ g_fprintf(where,
+ "%s: send_control_message failed\n", __func__);
+ fflush(where);
+ }
+ }
+ }
+ }
+ test = test->next;
+ if (new == TEST_DEAD) {
+ delete_test(id);
+ }
+ }
+ /* mutex unlocking is not required because only one
+ netserver thread looks at these data structures sgb */
+ }
+}
@@ -824,7 +924,7 @@
loop = 0;
}
if (msg) {
- rc = send_control_message(netperf->sock, msg, netperf->id, my_nid);
+ rc = send_control_message(netperf->sock, msg, netperf->id, netperf->my_nid);
if (rc != NPE_SUCCESS) {
if (debug) {
g_fprintf(where,
@@ -982,9 +1082,7 @@
CLOSE_SOCKET(control_socket); /* don't want to leak descriptors */
}
else {
- /* for now, we will simply start processing requests the
- "original" way. later we will establish a recv callback for a
- proper glib event loop */
+ /* lets setup the glib event loop... */
g_fprintf(where,"accepted a connection but told not to spawn\n");
#ifdef G_OS_WIN32
control_channel = g_io_channel_win32_new_socket(control_socket);
@@ -1437,11 +1535,12 @@
global_state_ptr->test_hash = test_hash;
global_state_ptr->message_state = g_malloc(sizeof(message_state_t));
global_state_ptr->is_netserver = TRUE;
+ global_state_ptr->first_message = TRUE;
global_state_ptr->loop = loop;
- global_state_ptr->message_state->have_header = FALSE;
- global_state_ptr->message_state->bytes_received = 0;
+ global_state_ptr->message_state->have_header = FALSE;
+ global_state_ptr->message_state->bytes_received = 0;
global_state_ptr->message_state->bytes_remaining = 4;
- global_state_ptr->message_state->buffer = NULL;
+ global_state_ptr->message_state->buffer = NULL;
if (need_setup) {
listen_sock = setup_listen_endpoint(listen_port);
@@ -1488,11 +1587,13 @@
else {
/* we used to call handle_netperf_requests(sock); here, now we use
the loop, luke... */
+
#ifdef G_OS_WIN32
control_channel = g_io_channel_win32_new_socket(control_socket);
#else
control_channel = g_io_channel_unix_new(control_socket);
#endif
+
status = g_io_channel_set_flags(control_channel,G_IO_FLAG_NONBLOCK,&error);
if (error) {
g_warning("g_io_channel_set_flags %s %d %s\n",
@@ -1524,6 +1625,9 @@
g_print("added watch id %d\n",watch_id);
g_print("Starting loop to process stuff...\n");
+ g_timeout_add(1000,
+ (GSourceFunc)check_test_state_callback,
+ global_state_ptr);
g_main_loop_run(loop);
g_print("Came out of the main loop\n");
}
More information about the netperf-dev
mailing list