agent.c

Go to the documentation of this file.
00001 
00008 /* $Id: agent.c,v 1.159 2010/04/23 15:29:40 tbrady Exp $ */
00009 /* $UTK_Copyright: $ */
00010 
00011 #include <stdlib.h>
00012 #include <stdio.h>
00013 #include <netdb.h>
00014 #include <strings.h>
00015 #include <sys/types.h>
00016 #include <sys/time.h>
00017 #include <sys/un.h>
00018 #include <sys/stat.h>
00019 #include <sys/mman.h>
00020 #include <fcntl.h>
00021 #include <string.h>
00022 #include <signal.h>
00023 #include <time.h>
00024 #include <unistd.h>
00025 #include <glob.h>
00026 
00027 #include "utility.h"
00028 #include "proxylib.h"
00029 #include "comm_basics.h"
00030 #include "comm_data.h"
00031 #include "server.h"
00032 #include "agent.h"
00033 #include "problem.h"
00034 #include "comm_encode.h"
00035 #include "gs_storage.h"
00036 #include "mfork.h"
00037 #ifdef GS_SMART_GRIDSOLVE
00038 #include "gs_smart_task_graph.h"
00039 #include "gs_smart_app_pm.h"
00040 #endif
00041 
00042 gs_agent_scheduler_t gs_agent_parse_scheduler_name(char *);
00043 
00046 char 
00047   *agent_cfg = NULL,
00048   GRIDSOLVE_SQLITE_DB[FN_LEN] = "",
00049   GRIDSOLVE_SENSOR_USOCK[FN_LEN] = "",
00050   GRIDSOLVE_STATS_FILE[FN_LEN] = "",
00051   GRIDSOLVE_SCHED_LOCK_FILE[FN_LEN] = "";
00052 
00053 gs_agent_stats_t
00054   gs_agent_stats;
00055 
00056 gs_agent_scheduler_t
00057   gs_agent_scheduler_selection;
00058 
00060 int GS_SENSORFD;
00061 FILE *GS_SENSORFILE;
00062 
00063 int
00064   global_taskid = 0,
00065   htm_scheduler_sync = 1,
00066   keep_track_of_task_events = 0;
00067 
00068 struct timeval
00069   global_start_time = {0, 0};
00070 
00072 sqlite3 *global_db = NULL;
00073 
00080 static void
00081 gs_agent_generic_signal_handler(int sig)
00082 {
00083   if(sig == SIGHUP) {
00084     gs_info_t *sa_list;
00085 
00086     LOGPRINTF("SIGHUP: reading configuration file '%s'\n", agent_cfg);
00087 
00088     if(gs_parse_config_file(agent_cfg, &sa_list) == 0) {
00089       if(sa_list) {
00090         gs_info_t *p;
00091 
00092         for(p = sa_list; p != NULL; p = p->next) {
00093           if(!strcasecmp(p->type, "GS_AGENT_SCHEDULER"))
00094             gs_agent_scheduler_selection = gs_agent_parse_scheduler_name(p->value);
00095           else if(!strcasecmp(p->type, "GS_HTM_SYNC"))
00096             htm_scheduler_sync = atoi(p->value);
00097 
00098           free(p->type);
00099           free(p->value);
00100         }
00101 
00102         free(sa_list);
00103       }
00104     }
00105 
00106     return;
00107   }
00108 
00109   ERRPRINTF("Agent terminating on signal %d.\n", sig);
00110   unlink(GRIDSOLVE_STATS_FILE);
00111   unlink(GRIDSOLVE_SQLITE_DB);
00112   unlink(GRIDSOLVE_SCHED_LOCK_FILE);
00113   exit(0);
00114 }
00115 
00122 static void
00123 gs_temp_sighup_handler(int sig)
00124 {
00125   kill(getppid(), SIGHUP);
00126   return;
00127 }
00128 
00136 double
00137 get_time_since_startup()
00138 {
00139   struct timeval tv;
00140   double cur_time, start_time;
00141 
00142   gettimeofday(&tv, NULL);
00143 
00144   cur_time = (double)tv.tv_sec + (((double)tv.tv_usec) / 1000000.0);
00145   start_time = (double)global_start_time.tv_sec + 
00146        (((double)global_start_time.tv_usec) / 1000000.0);
00147 
00148   return cur_time - start_time;
00149 }
00150 
00163 int
00164 gs_agent_create_info_dir(gs_agent_t *gs_agent)
00165 {
00166   ipaddr_t ipaddress;
00167   char dummy_componentid[CID_LEN];
00168 
00169   ipaddress = proxy_get_my_ipaddr();
00170 
00171   if(gs_create_info_dir(dummy_componentid, ipaddress,
00172        gs_agent->port, &(gs_agent->infodir)) < 0) {
00173     ERRPRINTF("Creating agent infodir failed.\n");
00174     return -1;
00175   }
00176 
00177   return 0;
00178 }
00179 
00191 gs_agent_t *
00192 gs_agent_init(char *cfg)
00193 {
00194   gs_agent_t *gs_agent;
00195   int pid;
00196   
00197   gettimeofday(&global_start_time, NULL);
00198 
00199   gs_agent = (gs_agent_t *) calloc(1, sizeof(gs_agent_t));
00200 
00201   if(!gs_agent) return NULL;
00202 
00203   /* Setup agent */
00204   gs_agent->hostname = gs_get_machine_name();
00205   gs_agent->servers = NULL;
00206   gs_agent->dsig = pvmgetdsig();
00207   gs_agent->port = getenv_int("GRIDSOLVE_AGENT_PORT", GRIDSOLVE_AGENT_PORT_DEFAULT);
00208 
00209   if(gs_agent_create_info_dir(gs_agent) < 0) {
00210     ERRPRINTF("Could not create agent infodir.\n");
00211     return NULL;
00212   }
00213 
00214   pid = getpid();
00215 
00216   gs_clean_up_old_temp_files(gs_agent->infodir);
00217 
00218   snprintf(GRIDSOLVE_SQLITE_DB, FN_LEN, "%s/%s.%d", gs_agent->infodir, GRIDSOLVE_SQLITE_DB_PREFIX, pid);
00219   snprintf(GRIDSOLVE_STATS_FILE, FN_LEN, "%s/%s.%d", gs_agent->infodir, GRIDSOLVE_STATS_FILE_PREFIX, pid);
00220   snprintf(GRIDSOLVE_SENSOR_USOCK, FN_LEN, "%s/%s.%d", gs_agent->infodir, GRIDSOLVE_SENSOR_USOCK_PREFIX, pid);
00221   snprintf(GRIDSOLVE_SCHED_LOCK_FILE, FN_LEN, "%s/%s.%d", gs_agent->infodir, GRIDSOLVE_SCHED_LOCK_FILE_PREFIX, pid);
00222 
00223   gs_init_stats_file(gs_agent, GRIDSOLVE_STATS_FILE);
00224 
00225   gs_agent->mysql.host = GRIDSOLVE_DEFAULT_MYSQL_HOST;
00226   gs_agent->mysql.user = GRIDSOLVE_DEFAULT_MYSQL_USER;
00227   gs_agent->mysql.passwd = GRIDSOLVE_DEFAULT_MYSQL_PASSWD;
00228   gs_agent->mysql.db_name = GRIDSOLVE_DEFAULT_MYSQL_DB;
00229   gs_agent->mysql.port = GRIDSOLVE_DEFAULT_MYSQL_PORT;
00230   gs_agent->mysql.unix_socket = GRIDSOLVE_DEFAULT_MYSQL_UNIX_SOCKET;
00231 
00232   if(cfg) {
00233     gs_info_t *sa_list;
00234 
00235     if(gs_parse_config_file(cfg, &sa_list) == 0) {
00236       if(sa_list) {
00237         gs_info_t *p;
00238 
00239         for(p = sa_list; p != NULL; p = p->next) {
00240           if(!strcasecmp(p->type, "GS_MYSQL_HOST"))
00241             gs_agent->mysql.host = p->value;
00242           else if(!strcasecmp(p->type, "GS_MYSQL_USER"))
00243             gs_agent->mysql.user = p->value;
00244           else if(!strcasecmp(p->type, "GS_MYSQL_PASSWD"))
00245             gs_agent->mysql.passwd = p->value;
00246           else if(!strcasecmp(p->type, "GS_MYSQL_DB"))
00247             gs_agent->mysql.db_name = p->value;
00248           else if(!strcasecmp(p->type, "GS_MYSQL_PORT"))
00249             gs_agent->mysql.port = atoi(p->value);
00250           else if(!strcasecmp(p->type, "GS_MYSQL_UNIX_SOCKET"))
00251             gs_agent->mysql.unix_socket = p->value;
00252           else if(!strcasecmp(p->type, "GS_AGENT_SCHEDULER"))
00253             gs_agent_scheduler_selection = gs_agent_parse_scheduler_name(p->value);
00254           else if(!strcasecmp(p->type, "GS_HTM_SYNC"))
00255             htm_scheduler_sync = atoi(p->value);
00256 
00257           /* free only the type field */
00258           free(p->type);
00259         }
00260 
00261         free(sa_list);
00262       }
00263     }
00264   }
00265 
00266   LOGPRINTF("Initialized agent %s\n", gs_agent->hostname);
00267 
00268   return (gs_agent);
00269 }
00270 
00281 int
00282 gs_init_stats_file(gs_agent_t *gs_agent, char *fname)
00283 {
00284   int fd;
00285 
00286   if(!gs_agent || !fname) {
00287     ERRPRINTF("Invalid args\n");
00288     return -1;
00289   }
00290 
00291   if((fd = open(fname, O_RDWR | O_CREAT | O_TRUNC, 0600)) < 0) {
00292     ERRPRINTF("Could not create stats file '%s'\n", fname);
00293     return -1;
00294   }
00295 
00296   gettimeofday(&gs_agent_stats.start_time, NULL);
00297   strncpy(gs_agent_stats.hostname, gs_agent->hostname, GS_MAX_NAMELEN);
00298   gs_agent_stats.hostname[GS_MAX_NAMELEN-1] = '\0';
00299   gs_agent_stats.port = gs_agent->port;
00300 
00301   write(fd, &gs_agent_stats, sizeof(gs_agent_stats_t));
00302 
00303   close(fd);
00304   return 0;
00305 }
00306 
00315 int
00316 gs_agent_process_availability_request(int sock)
00317 {
00318   gs_server_t srv;
00319   char *req;
00320   int s;
00321 
00322   if(gs_recv_string(sock, &req) < 0) {
00323     ERRPRINTF("Error reading request string.\n");
00324     return -1;
00325   }
00326 
00327   if(gs_decode_availability_request(req, &srv) < 0) {
00328     ERRPRINTF("Couldn't decode availability request.\n");
00329     if(req) free(req);
00330     return -1;
00331   }
00332 
00333   if(req) free(req);
00334 
00335   s = gs_connect_to_host(srv.componentid, srv.ipaddress, srv.port,
00336                             srv.proxyip, srv.proxyport);
00337 
00338   if(s == INVALID_SOCKET) {
00339     ERRPRINTF("Server failed connectivity test.\n");
00340     return -1;
00341   }
00342 
00343   if(gs_send_tag(s, GS_PROT_OK) < 0) {
00344     ERRPRINTF("Error sending response tag.\n");
00345     return -1;
00346   }
00347 
00348   gs_close_socket(s);
00349 
00350   return 0;
00351 }
00352 
00364 int
00365 gs_agent_process_problem_list(gs_agent_t *gs_agent, int sock)
00366 {
00367   gs_problem_t **problem_list = NULL;
00368   int i, count;
00369 
00370   if(!gs_agent) {
00371     ERRPRINTF("Invalid arg: null agent\n");
00372     return -1;
00373   }
00374 
00375   count = gs_get_all_problems(gs_agent, &problem_list, &count);
00376 
00377   if(count < 0) {
00378     DBGPRINTF("failed to get list of all problems\n");
00379     return -1;
00380   }
00381 
00382   /* Send number of problems to the client */
00383   if(gs_send_int(sock, count) < 0) {
00384     DBGPRINTF("failed to send number of problems\n");
00385     return -1;
00386   }
00387 
00388   for(i = 0; i < count; i++) {
00389     char *prob = NULL;
00390     if((gs_encode_problem(&prob, problem_list[i]) < 0) ||
00391        (gs_send_string(sock, prob) < 0)) {
00392       FREE(prob);
00393       DBGPRINTF("Failed to send problem list\n");
00394       return -1;
00395     }
00396 
00397     FREE(prob);
00398   }
00399 
00400   for(i = 0; i < count; i++)
00401     gs_free_problem(problem_list[i]);
00402   FREE(problem_list);
00403   return 0;
00404 }
00405 
00417 int
00418 gs_agent_process_server_list(gs_agent_t *gs_agent, int sock)
00419 {
00420   gs_server_t **server_list = NULL;
00421   int i, count;
00422 
00423   if(!gs_agent) {
00424     ERRPRINTF("Invalid arg: null agent\n");
00425     return -1;
00426   }
00427 
00428   count = gs_get_all_servers(gs_agent, &server_list, &count);
00429 
00430   if(count < 0) {
00431     DBGPRINTF("failed to get list of all servers\n");
00432     return -1;
00433   }
00434 
00435   /* Send number of servers to the client */
00436   if(gs_send_int(sock, count) < 0) {
00437     DBGPRINTF("failed to send number of servers\n");
00438     return -1;
00439   }
00440 
00441   for(i = 0; i < count; i++) {
00442     char *srv = NULL;
00443     DBGPRINTF("Encoding server: %s.\n", server_list[i]->hostname);
00444     if((gs_encode_server(&srv, server_list[i]) < 0) ||
00445        (gs_send_string(sock, srv) < 0)) {
00446       FREE(srv);
00447       DBGPRINTF("Failed to send server list \n");
00448       return -1;
00449     }
00450 
00451     FREE(srv);
00452   }
00453 
00454   for(i = 0; i < count; i++)
00455     gs_server_free(server_list[i]);
00456   FREE(server_list);
00457 
00458   return 0;
00459 }
00460 
00473 int
00474 gs_agent_process_server_ping_update(gs_agent_t *gs_agent, int sock)
00475 {
00476   char *cid = NULL, *msg = NULL;
00477 
00478   if(!gs_agent) {
00479     ERRPRINTF("Invalid arg: null agent\n");
00480     return -1;
00481   }
00482 
00483   if(gs_recv_string(sock, &cid) < 0) {
00484     ERRPRINTF("gs_recv_string (of cid) failed\n");
00485     return -1;
00486   }
00487 
00488   if(gs_recv_string(sock, &msg) < 0) {
00489     ERRPRINTF("gs_recv_string (of msg) failed\n");
00490     return -1;
00491   }
00492 
00493   if(gs_update_ping_list(gs_agent, cid, msg) < 0) {
00494     ERRPRINTF("failed to update ping list\n");
00495     return -1;
00496   }
00497 
00498   return 0;
00499 }
00500 
00513 int
00514 gs_agent_process_server_ping_list(gs_agent_t *gs_agent, int sock)
00515 {
00516   gs_server_t **server_list = NULL;
00517   char *cid = NULL;
00518   int i, count;
00519 
00520   if(!gs_agent) {
00521     ERRPRINTF("Invalid arg: null agent\n");
00522     return -1;
00523   }
00524 
00525   if(gs_recv_string(sock, &cid) < 0) {
00526     DBGPRINTF("gs_recv_string failed\n");
00527     return -1;
00528   }
00529 
00530   count = gs_get_server_ping_list(gs_agent, &server_list, cid, &count);
00531 
00532   free(cid);
00533 
00534   if(count < 0) {
00535     DBGPRINTF("failed to get list of all servers\n");
00536     return -1;
00537   }
00538 
00539   /* Send number of servers to the requesting server */
00540   if(gs_send_int(sock, count) < 0) {
00541     DBGPRINTF("failed to send number of servers\n");
00542     return -1;
00543   }
00544 
00545   for(i = 0; i < count; i++) {
00546     char *srv = NULL;
00547     DBGPRINTF("Encoding server: %s.\n", server_list[i]->hostname);
00548     if((gs_encode_server(&srv, server_list[i]) < 0) ||
00549        (gs_send_string(sock, srv) < 0)) {
00550       FREE(srv);
00551       DBGPRINTF("Failed to send server list \n");
00552       return -1;
00553     }
00554 
00555     FREE(srv);
00556   }
00557 
00558   for(i = 0; i < count; i++)
00559     gs_server_free(server_list[i]);
00560   FREE(server_list);
00561 
00562   return 0;
00563 }
00564 
00576 int
00577 gs_agent_process_problem_desc(gs_agent_t * gs_agent, int sock)
00578 {
00579   char *xml_problem = NULL;
00580   gs_server_t **server_list = NULL;
00581   gs_problem_t *problem = NULL;
00582   int i, rc, count = 0;
00583 
00584   if(!gs_agent) {
00585     ERRPRINTF("Invalid arg: null agent\n");
00586     return -1;
00587   }
00588 
00589   problem = (gs_problem_t *) CALLOC(1, sizeof(gs_problem_t));
00590   if(!problem) {
00591     DBGPRINTF("Could not malloc new problem.\n");
00592     return -1;
00593   }
00594 
00595   if(gs_recv_string(sock, &(problem->name)) < 0) {
00596     DBGPRINTF("gs_recv_string failed\n");
00597     FREE(problem);
00598     return -1;
00599   }
00600   DBGPRINTF("prob name = '%s'\n", problem->name);
00601 
00602   count =
00603       gs_get_server_list(gs_agent, problem, NULL, &server_list,
00604                          &count);
00605 
00606   rc = 0;
00607 
00608   if(count <= 0) {
00609     DBGPRINTF("could not find problem %s.\n", problem->name);
00610     if(gs_send_string(sock, "not found") < 0) {
00611       DBGPRINTF("Failed to send problem\n");
00612       rc = -1;
00613     }
00614   }
00615   else {
00616     DBGPRINTF("Encoding problem: %s.\n", problem->name);
00617     if((gs_encode_problem(&xml_problem, problem) < 0) ||
00618        (gs_send_string(sock, xml_problem) < 0)) {
00619       DBGPRINTF("Failed to send problem\n");
00620       rc = -1;
00621     }
00622   }
00623 
00624   FREE(xml_problem);
00625 
00626   for(i = 0; i < count; i++)
00627     gs_server_free(server_list[i]);
00628   free(server_list);
00629 
00630   gs_free_problem(problem);
00631 
00632   return rc;
00633 }
00634 
00650 int
00651 gs_agent_process_problem_submit(gs_agent_t * gs_agent, int sock)
00652 {
00653   char *msg = NULL, cid_string[2 * CID_LEN + 1];
00654   char *client_criteria = NULL;
00655   char *xml_problem = NULL;
00656   gs_server_t **server_list = NULL;
00657   gs_problem_t *problem = NULL;
00658   int i, count = 0, fd;
00659   int sender_dsig = 0;
00660   int sender_major = -1;
00661   int scalar_args_to_be_transferred = 0;
00662   int problem_desc_to_be_transferred = 0;
00663   time_t clock;
00664   struct tm *now;
00665   char subtime[128];
00666 
00667   if(!gs_agent) {
00668     ERRPRINTF("Invalid arg: null agent\n");
00669     return -1;
00670   }
00671 
00672   problem = (gs_problem_t *) CALLOC(1, sizeof(gs_problem_t));
00673   if(!problem) {
00674     DBGPRINTF("Could not malloc new problem.\n");
00675     return -1;
00676   }
00677 
00678   if(gs_recv_string(sock, &msg) < 0) {
00679     DBGPRINTF("gs_agent_process_problem_submit: gs_recv_string failed\n");
00680     return -1;
00681   }
00682   DBGPRINTF("gs_agent_process_problem_submit: msg = '%s'\n", msg);
00683 
00684   if(gs_decode_problem_submit_request(msg, &(problem->name), &sender_dsig, &client_criteria) < 0) {
00685     free(msg);
00686     DBGPRINTF("gs_agent_process_problem_submit: failed to decode request\n");
00687     return -1;
00688   }
00689   DBGPRINTF("problem->name = '%s'\n", problem->name);
00690 
00691   free(msg);
00692 
00693   /* Get the servers and fill in the problem structure.  This must be done
00694      before the problem can be transferred. */
00695   count =
00696       gs_get_server_list(gs_agent, problem, client_criteria, &server_list,
00697                          &count);
00698 
00699   if(client_criteria) {
00700     free(client_criteria);
00701     client_criteria = NULL;
00702   }
00703 
00704   /* Transfer problem descriptor if needed */
00705   if(gs_recv_int(sock, &problem_desc_to_be_transferred) < 0)
00706     goto error_comm;
00707   if(problem_desc_to_be_transferred == 1) {
00708     if(count == 0)
00709       problem->description = strdup(GS_UNKNOWN_PROB);
00710     if(gs_encode_problem(&xml_problem, problem) < 0)
00711       goto error_comm;
00712     if(gs_send_string(sock, xml_problem) < 0)
00713       goto error_comm;
00714     FREE(xml_problem);
00715 
00716     /* if the server count is 0, we have sent a problem struct
00717      * with the description set to GS_UNKNOWN_PROB.  When the
00718      * client receives this, it will abort the request, so we
00719      * bail out here also.
00720      */
00721     if(count == 0) {
00722       gs_free_problem(problem);
00723       return 0;
00724     } 
00725   }
00726 
00727   /* Get scalar args from the client if they are available; used for
00728      evaluating complexity expression */
00729   if(gs_recv_int(sock, &scalar_args_to_be_transferred) < 0)
00730     goto error_comm;
00731 
00732   if(scalar_args_to_be_transferred == 1) {
00733     if(gs_recv_input_scalar_args (sock, problem, sender_dsig, 
00734          gs_agent->dsig, &sender_major) < 0)
00735       goto error_comm;
00736 
00737     if(gs_receiver_compute_arg_sizes(problem, GS_IN) < 0) {
00738       ERRPRINTF("error computing argument sizes.\n");
00739       return -1;
00740     }
00741   }
00742 
00743   if((fd = gs_open_locked_file_timeout(GRIDSOLVE_SCHED_LOCK_FILE, F_WRLCK, 
00744              O_RDWR | O_CREAT, GS_SCHED_LOCK_TIMEOUT)) < 0) {
00745     ERRPRINTF("Timed out on scheduler lock.  Continuing w/o scheduling.\n");
00746   }
00747   else {
00748     /* Agent side scheduling using the problem complexity and server
00749        information occurs here.  The scheduler reorders the server list
00750        in order of best server to worst. */
00751     DBGPRINTF("Calling the agent side scheduler\n");
00752     if(gs_agent_scheduler(problem, count, server_list) < 0) {
00753       DBGPRINTF("Could not sort servers\n");
00754       gs_unlock_file(fd);
00755       return -1;
00756     }
00757 
00758     gs_unlock_file(fd);
00759   }
00760   
00761   if(count <= 0) {
00762     DBGPRINTF("No servers for problem '%s'\n", problem->name);
00763     count = 0;
00764   }
00765 
00766   /* Send task id to the client */
00767   if(gs_send_int(sock, global_taskid) < 0)
00768     goto error_comm;
00769 
00770   /* Send number of servers to the client */
00771   if(gs_send_int(sock, count) < 0)
00772     goto error_comm;
00773 
00774   /* Send servers and problem to client */
00775   if(count > 0) {
00776     char dummy_taskid[TASK_ID_LEN];
00777     double start_time;
00778 
00779     for(i = 0; i < count; i++) {
00780       char *srv = NULL;
00781       DBGPRINTF("Encoding server: %s.\n", server_list[i]->hostname);
00782       if((gs_encode_server(&srv, server_list[i]) < 0) ||
00783          (gs_send_string(sock, srv) < 0)) {
00784         FREE(srv);
00785         DBGPRINTF("Failed to send server list \n");
00786         return -1;
00787       }
00788 
00789       FREE(srv);
00790     }
00791 
00792     DBGPRINTF("Encoding problem: %s.\n", problem->name);
00793     if((gs_encode_problem(&xml_problem, problem) < 0) ||
00794        (gs_send_string(sock, xml_problem) < 0)) {
00795       FREE(xml_problem);
00796       DBGPRINTF("Failed to send problem\n");
00797       return -1;
00798     }
00799     FREE(xml_problem);
00800 
00801     /* assign task to first server in list */
00802 
00803     if(scalar_args_to_be_transferred) {
00804       struct timeval tv;
00805 
00806       gettimeofday(&tv, NULL);
00807       
00808       proxy_cid_to_str(cid_string, server_list[0]->componentid);
00809       start_time = get_time_since_startup();
00810 
00811       srand48((double) tv.tv_usec);
00812 
00813       for(i = 0; i < TASK_ID_LEN-1; i++)
00814         dummy_taskid[i] = (char) ((int)(drand48() * (double)('z' - 'a')) + 'a');
00815       dummy_taskid[TASK_ID_LEN-1] = 0;
00816 
00817       if(gs_insert_submitted_task_guess(cid_string, dummy_taskid, global_taskid,
00818            start_time, server_list[0]->score, 0.0, start_time, 0, 0) < 0)
00819         ERRPRINTF("Warning: error inserting task\n");
00820     }
00821   }
00822 
00823   LOGPRINTF("Agent %s got %s request from client and sent %d servers\n",
00824             gs_agent->hostname, problem->name, count);
00825   
00826   clock = time(0);
00827   now = localtime(&clock);
00828   snprintf(subtime, 1024, "[(%02d/%02d/%04d) %02d:%02d:%02d]",
00829            now->tm_mon+1,now->tm_mday,now->tm_year+1900, 
00830            now->tm_hour,now->tm_min,now->tm_sec);
00831 
00832   /* Heuristic: Temporarily increase the workload on the first server.
00833    * This assumes that the first server will be assigned the work and
00834    * we want to avoid assigning more work there till an new updated
00835    * workload report is received. */
00836   if (server_list != NULL && count > 0) {
00837     server_list[0]->workload += 100;
00838     if (gs_update_workload(gs_agent, server_list[0]) < 0) 
00839       DBGPRINTF("Could not temporarily update workload for the first server \n");
00840     proxy_cid_to_str(cid_string, server_list[0]->componentid);
00841   }
00842   
00843   for(i = 0; i < count; i++)
00844     gs_server_free(server_list[i]);
00845   if(server_list) free(server_list);
00846 
00847   gs_free_problem_and_data(problem);
00848 
00849   return 0;
00850 
00851 error_comm:
00852   for(i = 0; i < count; i++)
00853     gs_server_free(server_list[i]);
00854   if(server_list) free(server_list);
00855   gs_free_problem_and_data(problem);
00856   ERRPRINTF("Error handling problem submission\n");
00857   return -1;
00858 }
00859 
00860 int
00861 gs_agent_update_all_perf_models(char *model_update, char *srv_cid)
00862 {
00863   int i, num_updates;
00864   char *mcopy, *tok;
00865 
00866   if(!model_update)
00867     return -1;
00868 
00869   mcopy = strdup(model_update);
00870 
00871   if(!mcopy)
00872     return -1;
00873 
00874   tok = strtok(mcopy, "\n");
00875 
00876   num_updates = atoi(tok);
00877 
00878   for(i=0;i<num_updates;i++) {
00879     char *name, *expr;
00880 
00881     tok = strtok(NULL, "\n");
00882 
00883     if(gs_decode_model_update(tok, &name, &expr) < 0) {
00884       ERRPRINTF("Error parsing performance model update\n");
00885       continue;
00886     }
00887 
00888     if(gs_update_perf_expr(srv_cid, name, expr) < 0) {
00889       ERRPRINTF("Error updating performance model\n");
00890     }
00891 
00892     free(name);
00893     free(expr);
00894   }
00895 
00896   free(mcopy);
00897 
00898   return 0;
00899 }
00900 
00912 int
00913 gs_agent_process_workload_report(gs_agent_t * gs_agent, int sock)
00914 {
00915   char *msg, temp_cid[CID_LEN * 2 + 1];
00916   gs_server_t srv;
00917 
00918   if(!gs_agent) {
00919     ERRPRINTF("Invalid arg: null agent\n");
00920     return -1;
00921   }
00922 
00923   if(gs_recv_string(sock, &msg) < 0) {
00924     DBGPRINTF("gs_agent_process_workload_report: gs_recv_string failed\n");
00925     return -1;
00926   }
00927 
00928   if(gs_decode_workload_report(msg, &srv.workload, &srv.nproblems, 
00929       temp_cid) < 0) 
00930   {
00931     DBGPRINTF("Failed to decode workload report\n");
00932     free(msg);
00933     return -1;
00934   }
00935 
00936   free(msg);
00937 
00938   if(gs_recv_string(sock, &msg) < 0) {
00939     DBGPRINTF("gs_agent_process_workload_report: gs_recv_string failed\n");
00940     return -1;
00941   }
00942 
00943   gs_agent_update_all_perf_models(msg, temp_cid);
00944 
00945   free(msg);
00946 
00947   proxy_str_to_cid(srv.componentid, temp_cid);
00948   DBGPRINTF("gs_agent_process_workload_report: workload = %d serverID %s\n",
00949             srv.workload, temp_cid);
00950 
00951   if(gs_update_workload(gs_agent, &srv) < 0) {
00952     DBGPRINTF("gs_agent_process_workload_report: report from unknown server\n");
00953     if(gs_send_tag(sock, GS_PROT_UNKNOWN_SERVER) < 0)
00954       return -1;
00955   }
00956   else {                        /* Everything is OK */
00957     if(gs_send_tag(sock, GS_PROT_OK) < 0)
00958       return -1;
00959   }
00960 
00961   DBGPRINTF("Agent %s handled a workload report setting server %s to %d\n",
00962             gs_agent->hostname, temp_cid, srv.workload);
00963   return 0;
00964 }
00965 
00977 int
00978 gs_agent_process_server_registration(gs_agent_t * gs_agent, int sock)
00979 {
00980   char *serverstr = NULL;
00981   gs_server_t *gs_server;
00982   time_t clock;
00983   struct tm *now;
00984   char subtime[128];
00985 
00986   if(!gs_agent) {
00987     ERRPRINTF("Invalid arg: null agent\n");
00988     return -1;
00989   }
00990 
00991   gs_server = (gs_server_t *) CALLOC(1, sizeof(gs_server_t));
00992 
00993   if(!gs_server) {
00994     ERRPRINTF("Failed to malloc server struct\n");
00995     return -1;
00996   }
00997 
00998   DBGPRINTF("Entering \n");
00999 
01000   /* Receive server string and convert into structure */
01001   if((gs_recv_string(sock, &serverstr) < 0) ||
01002      (gs_decode_server(serverstr, gs_server) < 0))
01003     goto error;
01004 
01005   FREE(serverstr);
01006 
01007   gs_server->last_update = time(NULL);
01008 
01009   /* Actual storage routine */
01010   ASSERT_EXPR((gs_add_server(gs_agent, gs_server) >= 0), goto error);
01011 
01012   /* Send reply tag */
01013   if(gs_send_tag(sock, GS_PROT_OK) < 0)
01014     goto error;
01015 
01016   LOGPRINTF("Agent %s registered server %s\n", gs_agent->hostname, gs_server->hostname);
01017   clock = time(0);
01018   now = localtime(&clock);
01019   snprintf(subtime, 1024, "[(%02d/%02d/%04d) %02d:%02d:%02d]",
01020            now->tm_mon+1,now->tm_mday,now->tm_year+1900, 
01021            now->tm_hour,now->tm_min,now->tm_sec);
01022 
01023   gs_server_free(gs_server);
01024 
01025   return 0;
01026 
01027 error:
01028   ERRPRINTF("Error registering server\n");
01029   gs_server_free(gs_server);
01030   FREE(serverstr);
01031   gs_send_tag(sock, GS_PROT_ERROR);
01032   return -1;
01033 }
01034 
01047 int
01048 gs_agent_task_terminated(char *server_cid, char *request_id, int agent_taskid,
01049   double run_time)
01050 {
01051   gs_htm_task *task;
01052   double end_time;
01053 
01054   end_time = get_time_since_startup();
01055 
01056   if(gs_insert_completed_task(server_cid, request_id, agent_taskid, end_time, 
01057       0.0, 0.0, end_time, 0, 1) < 0)
01058     ERRPRINTF("Warning: could not insert completed task\n");
01059 
01060   task = (gs_htm_task *) malloc(sizeof(gs_htm_task));
01061 
01062   if(!task) {
01063     ERRPRINTF("malloc...\n");
01064     /* in this case don't need to return an error since it's not fatal */
01065     return 0;
01066   }
01067 
01068   if(gs_get_task_by_agent_taskid(agent_taskid, task) < 0) {
01069     free(task);
01070     /* also not fatal */
01071     return 0;
01072   }
01073 
01074   LOGPRINTF("task %d: total time = %lf, server run time = %lf\n", agent_taskid,
01075     end_time-task->start, run_time);
01076 
01077   free(task);
01078 
01079   return 0;
01080 }
01081 
01093 int
01094 gs_agent_process_notify_cancel(int sock)
01095 {
01096   char *msg, *server_cid, *request_id;
01097   int agent_taskid;
01098 
01099   if(gs_recv_string(sock, &msg) < 0) {
01100     ERRPRINTF("Error communicating with server.\n");
01101     return -1;
01102   }
01103 
01104   LOGPRINTF("msg = '%s'\n", msg);
01105   SENSORPRINTF("CANCEL %s\n", msg);
01106 
01107   /* if not keeping track of task events, bail out now. */
01108   if(!keep_track_of_task_events) {
01109     free(msg);
01110     return 0;
01111   }
01112 
01113   server_cid = (char *)malloc(strlen(msg));
01114 
01115   if(!server_cid) {
01116     ERRPRINTF("malloc failed.\n");
01117     return -1;
01118   }
01119 
01120   request_id = (char *)malloc(strlen(msg));
01121 
01122   if(!request_id) {
01123     ERRPRINTF("malloc failed.\n");
01124     free(server_cid);
01125     return -1;
01126   }
01127 
01128   if(gs_decode_cancel_notification(msg, &server_cid, &request_id, 
01129        &agent_taskid) < 0)
01130     ERRPRINTF("Could not decode cancel notifiction.\n");
01131   else
01132     gs_agent_task_terminated(server_cid, request_id, agent_taskid, 0.0);
01133 
01134   free(request_id);
01135   free(server_cid);
01136   free(msg);
01137 
01138   return 0;
01139 }
01140 
01152 int
01153 gs_agent_process_notify_failure(int sock)
01154 {
01155   char *msg, *server_cid, *request_id;
01156   int agent_taskid;
01157 
01158   if(gs_recv_string(sock, &msg) < 0) {
01159     ERRPRINTF("Error communicating with server.\n");
01160     return -1;
01161   }
01162 
01163   LOGPRINTF("msg = '%s'\n", msg);
01164   SENSORPRINTF("FAIL %s\n", msg);
01165 
01166   /* if not keeping track of task events, bail out now. */
01167   if(!keep_track_of_task_events) {
01168     free(msg);
01169     return 0;
01170   }
01171 
01172   server_cid = (char *)malloc(strlen(msg));
01173 
01174   if(!server_cid) {
01175     ERRPRINTF("malloc failed.\n");
01176     return -1;
01177   }
01178 
01179   request_id = (char *)malloc(strlen(msg));
01180 
01181   if(!request_id) {
01182     ERRPRINTF("malloc failed.\n");
01183     free(server_cid);
01184     return -1;
01185   }
01186 
01187   if(gs_decode_cancel_notification(msg, &server_cid, &request_id, 
01188        &agent_taskid) < 0)
01189     ERRPRINTF("Could not decode failure notifiction.\n");
01190   else
01191     gs_agent_task_terminated(server_cid, request_id, agent_taskid, 0.0);
01192 
01193   free(request_id);
01194   free(server_cid);
01195   free(msg);
01196 
01197   return 0;
01198 }
01199 
01211 int
01212 gs_agent_process_notify_complete(int sock)
01213 {
01214   char *msg, *server_cid, *request_id;
01215   double service_et;
01216   int agent_taskid;
01217 
01218   if(gs_recv_string(sock, &msg) < 0) {
01219     ERRPRINTF("Error communicating with server.\n");
01220     return -1;
01221   }
01222 
01223   LOGPRINTF("msg = '%s'\n", msg);
01224   SENSORPRINTF("COMPLE %s\n", msg);
01225 
01226   /* if not keeping track of task events, bail out now. */
01227   if(!keep_track_of_task_events) {
01228     free(msg);
01229     return 0;
01230   }
01231 
01232   server_cid = (char *)malloc(strlen(msg));
01233 
01234   if(!server_cid) {
01235     ERRPRINTF("malloc failed.\n");
01236     return -1;
01237   }
01238 
01239   request_id = (char *)malloc(strlen(msg));
01240 
01241   if(!request_id) {
01242     ERRPRINTF("malloc failed.\n");
01243     free(server_cid);
01244     return -1;
01245   }
01246 
01247   if(gs_decode_problem_complete_notification(msg, &server_cid, &request_id,
01248        &agent_taskid, &service_et) < 0)
01249     ERRPRINTF("Could not decode completion notifiction.\n");
01250   else
01251     gs_agent_task_terminated(server_cid, request_id, agent_taskid, service_et);
01252 
01253   free(request_id);
01254   free(server_cid);
01255   free(msg);
01256 
01257   return 0;
01258 }
01259 
01260 int
01261 gs_dump_task_list(char *server_cid)
01262 {
01263   gs_htm_task **tasks = NULL;
01264   int i, count;
01265 
01266   gs_get_tasks_for_server(server_cid, &tasks, &count, 1);
01267 
01268   if(count < 0) {
01269     ERRPRINTF("failed to get list of all tasks\n");
01270     return -1;
01271   }
01272 
01273   printf("task list for '%s':\n", server_cid);
01274 
01275   for(i=0;i<count;i++) {
01276     printf("%s:\n", tasks[i]->id);
01277     printf("start=%10.2lf, duration=%10.2lf, remaining=%10.2lf, end=%10.2lf, active=%d, finished=%d\n",
01278       tasks[i]->start, tasks[i]->duration, tasks[i]->remaining,
01279       tasks[i]->end, tasks[i]->active, tasks[i]->finished);
01280 
01281     free(tasks[i]);
01282   }
01283 
01284   free(tasks);
01285 
01286   return 0;
01287 }
01288 
01306 int
01307 gs_agent_reassign_task(char *cid_string, char *taskid, int agent_taskid, 
01308   double duration, double agent_est_time)
01309 {
01310   double start_time;
01311   gs_htm_task *task;
01312 
01313   task = (gs_htm_task *) malloc(sizeof(gs_htm_task));
01314 
01315   if(!task) {
01316     ERRPRINTF("malloc...\n");
01317     return -1;
01318   }
01319 
01320   start_time = get_time_since_startup();
01321 
01322   if(agent_taskid == -1) {
01323 
01324     if(gs_insert_submitted_task(cid_string, taskid, agent_taskid, start_time,
01325           duration, agent_est_time, start_time, 0, 0) < 0)
01326     {
01327       free(task);
01328       return -1;
01329     }
01330   }
01331   else {
01332     if(gs_get_task_by_agent_taskid(agent_taskid, task) < 0) {
01333 
01334       /* the task was not already assigned, so insert it now */
01335 
01336       if(gs_insert_submitted_task(cid_string, taskid, agent_taskid,
01337            start_time, duration, agent_est_time, start_time, 0, 0) < 0)
01338       {
01339         free(task);
01340         return -1;
01341       }
01342     }
01343     else {
01344       /* found the task in the database, just update the server assignment
01345        * if necessary.
01346        */
01347 
01348       LOGPRINTF("end time from db = %g, agent est end time = %g\n",
01349           task->end, agent_est_time);
01350 
01351       if(gs_update_task(cid_string, task->id, taskid, agent_taskid, task->start,
01352            duration, task->remaining, task->end, task->active,
01353            task->finished) < 0)
01354       {
01355         free(task);
01356         return -1;
01357       }
01358     }
01359   }
01360 
01361   free(task);
01362 
01363   return 0;
01364 }
01365 
01377 int
01378 gs_agent_process_notify_submit(int sock)
01379 {
01380   char *msg, *problem_name, *server_cid, *user_name, *host_name, 
01381     *client_cid, *request_id;
01382   int agent_taskid;
01383   double est_time, agent_est_time;
01384 
01385   if(gs_recv_string(sock, &msg) < 0) {
01386     ERRPRINTF("Error communicating with server.\n");
01387     return -1;
01388   }
01389 
01390   if(!msg) {
01391     ERRPRINTF("Bad notification message.\n");
01392     return -1;
01393   }
01394 
01395   LOGPRINTF("msg = '%s'\n", msg);
01396   SENSORPRINTF("SUBMIT %s\n", msg);
01397 
01398   /* if not keeping track of task events, bail out now. */
01399   if(!keep_track_of_task_events) {
01400     free(msg);
01401     return 0;
01402   }
01403 
01404   if(gs_decode_problem_solve_notification(msg, &problem_name, &est_time,
01405       &server_cid, &user_name, &host_name, &client_cid, &request_id, 
01406       &agent_taskid, &agent_est_time) < 0) {
01407     free(msg);
01408     ERRPRINTF("Error communicating with server.\n");
01409     return -1;
01410   }
01411 
01412   LOGPRINTF("in agent, est time = %lf, agent est time = %lf\n", est_time, agent_est_time);
01413 
01414   if(gs_agent_reassign_task(server_cid, request_id, agent_taskid, est_time, agent_est_time) < 0)
01415     ERRPRINTF("Warning: could not assign task\n");
01416     
01417   free(msg);
01418   free(problem_name);
01419   free(server_cid);
01420   free(user_name);
01421   free(host_name );
01422   free(client_cid);
01423   free(request_id);
01424 
01425   return 0;
01426 }
01427 
01440 int
01441 gs_agent_process_problem_registration(gs_agent_t * gs_agent, int sock)
01442 {
01443   char *problemstr, *temp_cid, **model_strings, **removed_probs;
01444   gs_problem_t **problem = NULL;
01445   int i, num_services, num_removed;
01446 
01447   if(!gs_agent) {
01448     ERRPRINTF("Invalid arg: null agent\n");
01449     return -1;
01450   }
01451 
01452   if(gs_recv_string(sock, &temp_cid) < 0) {
01453     ERRPRINTF("Error communicating with server.\n");
01454     return -1;
01455   }
01456 
01457   if(gs_recv_int(sock, &num_services) < 0) {
01458     ERRPRINTF("Error communicating with server.\n");
01459     return -1;
01460   }
01461 
01462   problem = (gs_problem_t **)calloc(num_services, sizeof(gs_problem_t *));
01463   if(!problem) {
01464     ERRPRINTF("malloc failed.\n");
01465     return -1;
01466   }
01467 
01473   model_strings = (char **)calloc(num_services+1, sizeof(char *));
01474   if(!model_strings) {
01475     ERRPRINTF("malloc failed.\n");
01476     return -1;
01477   }
01478 
01479   removed_probs = (char **)calloc(num_services+1, sizeof(char *));
01480   if(!removed_probs) {
01481     ERRPRINTF("malloc failed.\n");
01482     return -1;
01483   }
01484 
01485   i = 0;
01486   for(;;) {
01487     /* Receive string and convert into structure */
01488     if(gs_recv_string(sock, &problemstr) < 0) {
01489       ERRPRINTF("Error communicating with the server\n");
01490       goto error;
01491     }
01492 
01493     /* check whether this is the last problem */
01494     if(!strcmp(problemstr, GS_END_PROB_REG)) {
01495       FREE(problemstr);
01496       break;
01497     }
01498 
01499     problem[i] = (gs_problem_t *) CALLOC(1, sizeof(gs_problem_t));
01500 
01501     if(!problem[i]) {
01502       ERRPRINTF("Failed to allocate problem struct\n");
01503       goto error;
01504     }
01505 
01506     if(gs_decode_problem(problemstr, problem[i]) < 0) {
01507       ERRPRINTF("Error decoding problem struct\n");
01508       FREE(problemstr);
01509       goto error;
01510     }
01511 
01512     FREE(problemstr);
01513 
01514     LOGPRINTF("Received problem %s\n", problem[i]->name);
01515 
01516     i++;
01517   }
01518 
01519   /* num_services is sent as the total number of services on disk at the server,
01520    * but the actual number of sent services is 'i'.
01521    */
01522   num_services = i;
01523 
01524   if(num_services > 0)
01525     LOGPRINTF("Received %d new services\n", num_services);
01526 
01527   i = 0;
01528   for(;;) {
01529     if(gs_recv_string(sock, &model_strings[i]) < 0) {
01530       ERRPRINTF("Error communicating with the server\n");
01531       goto error;
01532     }
01533     
01534     /* check whether this is the last problem */
01535     if(!strcmp(model_strings[i], GS_END_PROB_REG))
01536       break;
01537 
01538     i++;
01539   }
01540 
01541   i = 0;
01542   for(;;) {
01543 
01544     if(gs_recv_string(sock, &removed_probs[i]) < 0) {
01545       ERRPRINTF("Error communicating with the server\n");
01546       goto error;
01547     }
01548 
01549     /* check whether this is the last problem */
01550     if(!strcmp(removed_probs[i], GS_END_PROB_REG)) {
01551       break;
01552     }
01553 
01554     i++;
01555   }
01556 
01557   num_removed = i;
01558 
01559   /* Actual storage */
01560   if(gs_register_problem_changes(gs_agent, problem, num_services,
01561         model_strings, removed_probs, num_removed, temp_cid) < 0) {
01562     ERRPRINTF("Failed to add problem to database\n");
01563     goto error;
01564   }
01565 
01566   /* Send reply tag */
01567   if(gs_send_tag(sock, GS_PROT_OK) < 0)
01568     goto error;
01569 
01570   FREE(temp_cid);
01571   return 0;
01572 
01573 error:
01574   ERRPRINTF("Agent could not process problem registration \n");
01575 
01576   if(problem) {
01577     for(i=0;i<num_services;i++)
01578       if(problem[i])
01579         gs_free_problem(problem[i]);
01580     free(problem);
01581   }
01582 
01583   FREE(temp_cid);
01584   gs_send_tag(sock, GS_PROT_ERROR);
01585   return -1;
01586 }
01587 
01598 int
01599 gs_agent_process_kill_agent(gs_agent_t *gs_agent, int sock)
01600 {
01601   char *msg;
01602 
01603   if(!gs_agent) {
01604     ERRPRINTF("Invalid arg: null agent\n");
01605     return -1;
01606   }
01607 
01608   /* If authenticated, authorize to kill */
01609 
01610   if(gs_recv_string(sock, &msg) < 0) {
01611     ERRPRINTF("Error receving Password \n");
01612     return -1;
01613   }
01614 
01615   if(!strcmp(msg, "GridSolve")) {
01616     free(msg);
01617 
01618     if(gs_send_tag(sock, GS_PROT_OK) < 0) {
01619       ERRPRINTF("Error sending confirmation \n");
01620       return -1;
01621     }
01622 
01623     ERRPRINTF("Agent terminating...\n");
01624 
01625 
01626     gs_close_socket(gs_agent->sock);
01627 
01628     kill(getppid(),SIGTERM);
01629   } 
01630   else {
01631     free(msg);
01632 
01633     if(gs_send_tag(sock, GS_PROT_ERROR) < 0) {
01634       ERRPRINTF("Error sending confirmation \n");
01635       return -1;
01636     }
01637   }
01638 
01639   return 0;
01640 }
01641 
01642 
01653 int
01654 gs_agent_process_kill_server(gs_agent_t *gs_agent, int sock)
01655 {
01656   gs_server_t **server_list = NULL;
01657   int i, count, tag;
01658   char *msg;
01659 
01660   if(!gs_agent) {
01661     ERRPRINTF("Invalid arg: null agent\n");
01662     return -1;
01663   }
01664 
01668   if(gs_recv_string(sock, &msg) < 0) {
01669     ERRPRINTF("Error receving Server\n");
01670     return -1;
01671   }
01672 
01673   if((strlen(msg) >= 2) && !strncmp(msg, "-c", 2)) {
01674     LOGPRINTF("GS_PROT_KILL_SERVER request for server cid '%s': ", msg+2);
01675 
01676     server_list = (gs_server_t **) calloc(1, sizeof(gs_server_t *));
01677     if(!server_list) {
01678       ERRPRINTF("malloc failed\n");
01679       free(msg);
01680       return -1;
01681     }
01682 
01683     server_list[0] = (gs_server_t *) calloc(1, sizeof(gs_server_t));
01684     if(!server_list[0]) {
01685       ERRPRINTF("malloc failed\n");
01686       free(server_list);
01687       free(msg);
01688       return -1;
01689     }
01690 
01691     if(gs_get_server_by_cid(gs_agent, msg+2, server_list[0]) < 0) {
01692       ERRPRINTF("No such server: '%s'.\n", msg+2);
01693 
01694       free(server_list[0]);
01695       free(server_list);
01696       free(msg);
01697 
01698       if(gs_send_tag(sock, GS_PROT_UNKNOWN_SERVER) < 0) {
01699         ERRPRINTF("Unsuccessful (Sending No Server tag)\n");
01700         return -1;
01701       }
01702 
01703       return -1;
01704     }
01705 
01706     count = 1;
01707   }
01708   else {
01709     LOGPRINTF("GS_PROT_KILL_SERVER request for server '%s': ", msg);
01710 
01711     count = gs_get_all_servers_by_hostname(gs_agent, msg, &server_list, &count);
01712 
01713     if(count < 1) {
01714       ERRPRINTF("No such server: '%s'.\n", msg);
01715 
01716       free(msg);
01717 
01718       if(gs_send_tag(sock, GS_PROT_UNKNOWN_SERVER) < 0) {
01719         ERRPRINTF("Unsuccessful (Sending No Server tag)\n");
01720         return -1;
01721       }
01722 
01723       return -1;
01724     }
01725 
01726     if(count > 1) {
01727       ERRPRINTF("Multiple matches for server: '%s'.\n", msg);
01728 
01729       free(msg);
01730 
01731       for(i = 0; i < count; i++)
01732         gs_server_free(server_list[i]);
01733       FREE(server_list);
01734 
01735       if(gs_send_tag(sock, GS_PROT_MULTIPLE_SERVER) < 0) {
01736         ERRPRINTF("Unsuccessful (Sending Multiple Server tag)\n");
01737         return -1;
01738       }
01739 
01740       return -1;
01741     }
01742   }
01743 
01744   free(msg);
01745 
01746   if(gs_send_tag(sock, GS_PROT_OK) < 0) {
01747     ERRPRINTF("Unsuccessful (Sending No Server)\n");
01748     return -1;
01749   }
01750 
01751   if(gs_encode_server(&msg, server_list[0]) < 0) {
01752     ERRPRINTF("Unsuccessful (Encoding Server)\n");
01753     gs_server_free(server_list[0]);
01754     return -1;
01755   }
01756 
01757   if(gs_send_string(sock, msg) < 0) {
01758     ERRPRINTF("Unsuccessful (Sending Server)\n");
01759     free(msg);
01760     gs_server_free(server_list[0]);
01761     return -1;
01762   }
01763 
01764   free(msg);
01765 
01766   if(gs_recv_tag(sock, &tag) < 0) {
01767     ERRPRINTF("Error receving Confirmation\n");
01768     return -1;
01769   }
01770 
01771   if(tag != GS_PROT_OK) {
01772     ERRPRINTF("Unsuccessful (rejected response from client).\n");
01773     gs_server_free(server_list[0]);
01774     return -1;
01775   }
01776 
01777   gs_delete_server(gs_agent, server_list[0]);
01778   gs_server_free(server_list[0]);
01779 
01780   LOGPRINTF("Server successfully killed.\n");
01781 
01782   FREE(server_list);
01783   return 0;
01784 }
01785 
01786 
01787 
01788 #ifdef GS_SMART_GRIDSOLVE
01789 
01806 int gs_smart_fault_update_pm(gs_agent_t * gs_agent, int sock){
01807  
01808 
01809 
01810   char *serverstr = NULL;
01811   gs_server_t *gs_server;
01812   time_t clock;
01813   struct tm *now;
01814   char subtime[128];
01815 
01816   if(!gs_agent) {
01817     ERRPRINTF("Invalid arg: null agent\n");
01818     return -1;
01819   }
01820 
01821   gs_server = (gs_server_t *) CALLOC(1, sizeof(gs_server_t));
01822 
01823   if(!gs_server) {
01824     ERRPRINTF("Failed to malloc server struct\n");
01825     return -1;
01826   }
01827 
01828   DBGPRINTF("Entering \n");
01829 
01830   /* Receive server string and convert into structure */
01831   if((gs_recv_string(sock, &serverstr) < 0) ||
01832      (gs_decode_server(serverstr, gs_server) < 0))
01833     return -1;
01834 
01835   FREE(serverstr);
01836 
01837 
01838  
01839   LOGPRINTF("SMART: Server %s has failed during execution .\n", gs_server->hostname);
01840       
01841   if(gs_delete_server(gs_agent, gs_server) < 0){
01842     ERRPRINTF("Failed to delete server '%s'\n", gs_server->hostname);
01843     return -1;        
01844   }
01845   LOGPRINTF("SMART: Server %s has been removed from database .\n", gs_server->hostname);
01846 
01847   gs_server_free(gs_server);
01848 
01849 
01850   if(gs_send_tag(sock, GS_PROT_OK) < 0)
01851     return -1;
01852 
01853 
01854 }
01855 
01856 
01857 
01858 
01859 
01860 
01861 
01862 
01863 
01864 
01865 /*
01866  * This function adds the idl information to the application performance model.
01867  * This idl information is added to the problem structure of each app_pm node when 
01868  * the gs_get_server_list is called.
01869  *
01870  *
01871  * @param gs_agent --- The agent data structure
01872  * @param app_pm -- The application performance model
01873  *
01874  * @returns 0 on success, -1 on failure
01875  *
01876  */
01877 
01878 int gs_smart_add_idl_info_to_app_pm(gs_agent_t * gs_agent , gs_smart_app_pm * app_pm){
01879   gs_server_t **server_list;
01880   int count;
01881   int i;
01882   char * app_pm_problem;
01883   
01884   for(i=0; i<app_pm->nb_tasks;i++){
01885     app_pm->tasks[i]->problem = (gs_problem_t *)calloc(1, sizeof(gs_problem_t));
01886     if(!app_pm->tasks[i]->problem){
01887       ERRPRINTF("SMART: Error allocating to problem %d\n", i);
01888       return -1;
01889     }
01890     app_pm->tasks[i]->problem->name=app_pm->tasks[i]->nickname;
01891     if(gs_encode_problem(&app_pm_problem, app_pm->tasks[i]->problem) < 0){
01892         ERRPRINTF("SMART: Error encoding problem\n");
01893         return -1;
01894       }
01895 
01896     count = gs_get_server_list(gs_agent, app_pm->tasks[i]->problem, NULL, &server_list, &count);
01897   }
01898   return 0;
01899 
01900 }
01901 
01902 
01903 /*
01904  * This function adds the list of servers that can execute each remote task 
01905  * to each remote task node of the task graph. 
01906  *
01907  *
01908  * @param gs_agent --- The agent data structure
01909  * @param task -- The task graph structure which represents the group of task
01910  *                which will be mapped.
01911  *
01912  * @returns 0 on success, -1 on failure
01913  *
01914  */
01915 
01916 
01917 
01918 int gs_smart_add_servers_to_tg(gs_agent_t * gs_agent, gs_smart_tg * task_graph){
01919   int i, count, node_index=0;
01920   gs_server_t ***server_list;
01921   char *client_criteria = NULL;
01922   server_list= (gs_server_t ***)calloc(task_graph->nb_nodes , sizeof(gs_server_t **));
01923   if(!server_list){
01924     ERRPRINTF("SMART: Error allocating to server list\n");
01925     return -1;
01926   }
01927   for(i=0; i<task_graph->nb_nodes;i++){
01928     if(task_graph->task_nodes[i]->node_type==GS_SMART_TG_REM_TASK_NODE){
01929       count = gs_get_server_list(gs_agent, task_graph->task_nodes[i]->tg_rem_task_node->problem, client_criteria, &server_list[node_index], &count);
01930       if( ( !server_list[node_index] ) || ( count<=0 ) ){
01931         ERRPRINTF("SMART: Error receiving server list or no servers in list\n");
01932         return -1;
01933       }
01934       task_graph->task_nodes[i]->tg_rem_task_node->nb_servers=count;
01935       task_graph->task_nodes[i]->tg_rem_task_node->avail_servers=server_list[node_index];
01936       node_index++;
01937     }
01938   }
01939   return 0;
01940 }
01941 
01942 
01943 
01966 int
01967 gs_smart_agent_process_app_pm_model(gs_agent_t * gs_agent, int sock){
01968   gs_smart_tg * tg;
01969   gs_server_t **server_list = NULL;
01970   gs_smart_app_pm *  app_pm;
01971   char * app_pm_str=NULL;
01972   int tag, i, count;
01973   int graph_type;
01974   char * comm_type=NULL; 
01975  /*
01976    * Set graph type to the type of
01977    * task graph that should be generated.
01978    * This variable determines what dependencies
01979    * get anaylzed when generating the task graph
01980    * If set to GS_SMART_NO_DEP_GRAPH, then the
01981    * task graph will be generated with no
01982    * dependencies and therefore the mapping heuristic
01983    * is unable to generate solution which implement
01984    * remote communication.
01985    */
01986 
01987 //  graph_type=GS_SMART_NO_DEP_GRAPH;
01988 // graph_type=GS_SMART_P2P_GRAPH;
01989 //  graph_type=GS_SMART_SRV_BRDCST_GRAPH;
01990 
01991 
01992   if(gs_recv_string(sock, &(comm_type)) < 0) {
01993     ERRPRINTF("SMART: Error receiving xml info string\n");
01994     return -1;
01995   }
01996   if(strcmp(comm_type,"enable_remote_comm")==0){
01997     graph_type=GS_SMART_CL_SRV_BRDCST_GRAPH;
01998   }
01999   else if(strcmp(comm_type, "no_dep")==0){
02000     graph_type=GS_SMART_NO_DEP_GRAPH;
02001   }
02002   else if(strcmp(comm_type, "server_comm")==0){
02003     graph_type=GS_SMART_SRV_BRDCST_GRAPH;
02004   }
02005   else if(strcmp(comm_type, "disable_remote_comm")==0){
02006     graph_type=GS_SMART_CLNT_BRDCAST_COMM;
02007   }
02008   else{
02009     graph_type=GS_SMART_CL_SRV_BRDCST_GRAPH;
02010   }
02011   app_pm=(gs_smart_app_pm *)calloc(1,sizeof(gs_smart_app_pm));
02012   
02013   if(!gs_agent) {
02014     ERRPRINTF("SMART: Invalid arg: null agent\n");
02015     if(app_pm_str) free(app_pm_str);
02016     if(app_pm) gs_smart_app_pm_free(app_pm);
02017     return -1;
02018   }
02019 
02020   if(gs_recv_string(sock, &(app_pm_str)) < 0) {
02021     ERRPRINTF("SMART: Error receiving app_pm string\n");
02022     if(app_pm_str) free(app_pm_str);
02023     if(app_pm) gs_smart_app_pm_free(app_pm);
02024     return -1;
02025   }
02026   
02027   if(gs_smart_decode_app_pm(app_pm_str,app_pm)<0){
02028     ERRPRINTF("SMART: Error decoding app_pm string\n");
02029     if(app_pm_str) free(app_pm_str);
02030     if(app_pm) gs_smart_app_pm_free(app_pm);
02031     return -1;
02032   }
02033 
02034   if(app_pm_str) free(app_pm_str);
02035   tg = (gs_smart_tg *)calloc(1, sizeof(gs_smart_tg ));
02036 
02037   if(!tg){
02038     ERRPRINTF("SMART: Error allocating memory for task graph\n");
02039     if(app_pm_str) free(app_pm_str);
02040     if(app_pm) gs_smart_app_pm_free(app_pm);
02041     if(tg) gs_smart_tg_free(tg);
02042     return -1;
02043   }
02044 
02045   tag=GS_SMART_GRAPH_OK;
02046 
02047   gs_problem_t **problem_list;
02048   problem_list=(gs_problem_t **)calloc(app_pm->nb_tasks , sizeof(gs_problem_t *));
02049 
02050   if(!problem_list){
02051     ERRPRINTF("SMART: Error allocating to problem list\n");
02052     return -1;
02053   }
02054 
02055   for(i=0; i<app_pm->nb_tasks;i++){
02056     problem_list[i] = (gs_problem_t *)calloc(1, sizeof(gs_problem_t));
02057     if(!problem_list[i]){
02058       ERRPRINTF("SMART: Error allocating to problem %d\n", i);
02059       return -1;
02060     }
02061   }
02062   
02063   if(gs_smart_add_idl_info_to_app_pm(gs_agent, app_pm)<0){
02064     ERRPRINTF("SMART: Error getting problem list for application performance model\n");
02065     return -1;
02066   }
02067 
02068   if(gs_smart_tg_construct_tg(app_pm, tg, graph_type)<0){
02069     ERRPRINTF("SMART: Constructing task nodes for task graph\n");
02070      tag=GS_SMART_GRAPH_FAIL;
02071   }
02072 
02073   if(tag==GS_SMART_GRAPH_OK){
02074     if(gs_smart_add_servers_to_tg(gs_agent, tg)<0){
02075       ERRPRINTF("SMART: Error adding servers to task graph\n");
02076       tag=GS_SMART_GRAPH_FAIL;
02077     }
02078   }
02079   if(gs_send_tag(sock, tag)<0){
02080     ERRPRINTF("SMART: Error sending tag\n");
02081     if(tg) gs_smart_tg_free(tg);
02082     return -1;
02083   }
02084   if(tag==GS_SMART_GRAPH_OK){
02085     DBGPRINTF("SMART: Task graph created successfully\n");
02086   }
02087   if(tag==GS_SMART_GRAPH_FAIL){
02088     ERRPRINTF("SMART: Error creating task graph. This may be due to no servers registered\n");
02089     if(tg) gs_smart_tg_free(tg);
02090     return -1;
02091   }
02092 
02093   if(gs_smart_send_tg(sock, tg)<0){
02094     ERRPRINTF("SMART: gs_send_task_graph failed\n");
02095     if(tg) gs_smart_tg_free(tg);
02096     return -1;
02097   }
02098 
02099   count = gs_get_all_servers(gs_agent, &server_list, &count);
02100   if(count < 0) {
02101     ERRPRINTF("failed to get list of all servers\n");
02102     return -1;
02103   }
02104 
02105   if(gs_smart_send_all_servers(sock, server_list, count)<0){
02106     ERRPRINTF("SMART: Error sending server link info\n");
02107     return -1;
02108   }
02109 
02110   for(i = 0; i < count; i++)
02111     gs_server_free(server_list[i]);
02112     
02113   FREE(server_list);
02114 
02115   if(gs_send_tag(sock, GS_PROT_OK) < 0)
02116     return -1;
02117 
02118 
02119   if(app_pm){
02120     if(gs_smart_app_pm_free(app_pm)<0){
02121       ERRPRINTF("SMART : Error freeing application performance model\n");
02122       return -1;
02123     }
02124   }
02125 
02126   if(tg){
02127     if(gs_smart_tg_free(tg)<0){
02128       ERRPRINTF("SMART: Error Freeing task graph\n");
02129       return -1;
02130     }
02131   }
02132 
02133   return 0;
02134 }
02135 
02136 #endif
02137 
02138 
02139 
02140 
02141 
02142 
02153 int
02154 gs_agent_process_message(gs_agent_t * gs_agent, int sock)
02155 {
02156   char *version_str;
02157   int tag, retval;
02158 
02159   if((gs_recv_tag(sock, &tag)) < 0) {
02160     ERRPRINTF("could not get tag\n");
02161     return -1;
02162   }
02163 
02164   if((gs_recv_string(sock, &version_str)) < 0) {
02165     ERRPRINTF("could not read version string\n");
02166     return -1;
02167   }
02168 
02169   if(gs_versions_incompatible(version_str, VERSION))
02170     retval = gs_send_tag(sock, GS_PROT_VERSION_MISMATCH);
02171   else
02172     retval = gs_send_tag(sock, GS_PROT_OK);
02173 
02174   free(version_str);
02175 
02176   if(retval < 0) {
02177     ERRPRINTF("could not send response tag.\n");
02178     return -1;
02179   }
02180 
02181   if(gs_storage_init(gs_agent) < 0) {
02182     ERRPRINTF("Could not init database.\n");
02183     return -1;
02184   }
02185   
02186   retval = -1;
02187 
02188   switch (tag) {
02189     case GS_PROT_PROBLEM_SUBMIT:
02190       retval = gs_agent_process_problem_submit(gs_agent, sock);
02191       break;
02192     case GS_PROT_SERVER_REGISTER:
02193       retval = gs_agent_process_server_registration(gs_agent, sock);
02194       break;
02195     case GS_PROT_PROBLEM_REGISTER:
02196       retval = gs_agent_process_problem_registration(gs_agent, sock);
02197       break;
02198     case GS_PROT_WORKLOAD_REPORT:
02199       retval = gs_agent_process_workload_report(gs_agent, sock);
02200       break;
02201     case GS_PROT_KILL_AGENT:
02202       retval = gs_agent_process_kill_agent(gs_agent, sock);
02203       break;
02204     case GS_PROT_KILL_SERVER:
02205       retval = gs_agent_process_kill_server(gs_agent, sock);
02206       break;
02207     case GS_PROT_SERVER_LIST:
02208       retval = gs_agent_process_server_list(gs_agent, sock);
02209       break;
02210     case GS_PROT_SERVER_PING_LIST:
02211       retval = gs_agent_process_server_ping_list(gs_agent, sock);
02212       break;
02213     case GS_PROT_SERVER_PING_UPDATE:
02214       retval = gs_agent_process_server_ping_update(gs_agent, sock);
02215       break;
02216     case GS_PROT_PROBLEM_LIST:
02217       retval = gs_agent_process_problem_list(gs_agent, sock);
02218       break;
02219     case GS_PROT_PROBLEM_DESC:
02220       retval = gs_agent_process_problem_desc(gs_agent, sock);
02221       break;
02222     case GS_PROT_NOTIFY_SUBMIT:
02223       retval = gs_agent_process_notify_submit(sock);
02224       break;
02225     case GS_PROT_NOTIFY_FAILURE:
02226       retval = gs_agent_process_notify_failure(sock);
02227       break;
02228     case GS_PROT_NOTIFY_COMPLETE:
02229       retval = gs_agent_process_notify_complete(sock);
02230       break;
02231     case GS_PROT_NOTIFY_CANCEL:
02232       retval = gs_agent_process_notify_cancel(sock);
02233       break;
02234     case GS_PROT_AVAILABILITY_REQ:
02235       retval = gs_agent_process_availability_request(sock);
02236       break;
02237 #ifdef GS_SMART_GRIDSOLVE
02238     case GS_PROT_APP_PM_MODEL:
02239       retval = gs_smart_agent_process_app_pm_model(gs_agent, sock);
02240       break;
02241     case  GS_SMART_FAULT_UPDATE_PM:
02242       retval = gs_smart_fault_update_pm(gs_agent, sock);
02243       break;
02244 #endif
02245     default:
02246       LOGPRINTF("Unknown tag %d\n", tag);
02247       break;
02248   }
02249 
02250   gs_storage_finalize(gs_agent);
02251 
02252   return retval;
02253 }
02254 
02266 int
02267 gs_agent_listen_and_process_messages(gs_agent_t *gs_agent)
02268 {
02269   int sock = -1;
02270   int accept_error = 0;
02271 
02272   if(!gs_agent) {
02273     ERRPRINTF("Invalid arg: null agent\n");
02274     return -1;
02275   }
02276 
02277   /* Initialize proxy library */
02278   proxy_init("");
02279 
02280   gs_agent->sock = gs_establish_socket(&(gs_agent->port), TRUE);
02281 
02282   if(gs_agent->sock == -1) {
02283     ERRPRINTF("Could not bind to port %d \n", gs_agent->port);
02284     return (-1);
02285   }
02286   LOGPRINTF("Agent %s listening at port %d\n",
02287             gs_agent->hostname, gs_agent->port);
02288 
02289   gs_listen_on_socket(gs_agent->sock);
02290 
02291   for(;;) {
02292     pid_t pid;
02293 
02294     /* if accept fails, go back to while loop */
02295     if((sock = gs_accept_connection(gs_agent->sock)) == -1) {
02296       /* Print only one copy of this error message */
02297       if(accept_error != 1)
02298         ERRPRINTF
02299             ("Failed to accept connection on socket, return to listening \n");
02300       accept_error = 1;
02301       continue;
02302     }
02303     else {
02304       accept_error = 0;
02305     }
02306 
02307     /* increment task id before forking. */
02308     global_taskid++;
02309 
02310     pid = fork();
02311 
02312     switch(pid) {
02313       case -1:     /* error */
02314         ERRPRINTF("Failed to fork\n");
02315         gs_close_socket(sock);
02316         continue;
02317       case 0:      /* child */
02318         gs_close_socket(gs_agent->sock);
02319         gs_agent_process_message(gs_agent, sock);
02320         _exit(0);
02321       default:
02322         gs_close_socket(sock);
02323     }
02324 
02325     fflush(stderr);
02326     fflush(stdout);
02327   }
02328 
02329   return 0;
02330 }
02331 
02340 gs_agent_scheduler_t
02341 gs_agent_parse_scheduler_name(char *name)
02342 {
02343   if(!name) return GS_INVALID_SCHEDULER;
02344 
02345   if(!strcasecmp(name, "default_mct"))
02346     return GS_DEFAULT_MCT;
02347 
02348   /* if using one of the HTM schedulers, set the flag to enable
02349    * keeping track of task events (start time, end time, etc).
02350    */
02351   if(!strncasecmp(name, "htm", 3)) {
02352     keep_track_of_task_events = 1;
02353 
02354     if(!strcasecmp(name, "htm_ml"))
02355       return GS_HTM_ML;
02356 
02357     if(!strcasecmp(name, "htm_msf"))
02358       return GS_HTM_MSF;
02359 
02360     if(!strcasecmp(name, "htm_hmct"))
02361       return GS_HTM_HMCT;
02362 
02363     if(!strcasecmp(name, "htm_mp"))
02364       return GS_HTM_MP;
02365   }
02366 
02367   return GS_INVALID_SCHEDULER;
02368 }
02369 
02386 int
02387 gs_agent_parse_cmd_line(int argc, char **argv, char **logfile, int *daemon,
02388   int *httpd_port, char **config_file)
02389 {
02390   char *httpd_port_env;
02391   int c;
02392 
02393   if(!argv || !logfile || !daemon) {
02394     ERRPRINTF("Invalid args\n");
02395     return -1;
02396   }
02397 
02398   *logfile = NULL;
02399   *daemon = 1;
02400   *config_file = NULL;
02401 
02402   /* check if the GRIDSOLVE_HTTPD_PORT is set to "disable" */
02403 
02404   if((httpd_port_env = getenv("GRIDSOLVE_HTTPD_PORT")) &&
02405      (!strcmp(httpd_port_env, "disable")))
02406     *httpd_port = -1;
02407   else
02408     *httpd_port = getenv_int("GRIDSOLVE_HTTPD_PORT", 
02409          GRIDSOLVE_HTTPD_PORT_DEFAULT);
02410 
02411   /* when making changes to the command line args, update 
02412    * GS_AGENT_USAGE_STR so the usage information is printed 
02413    * correctly upon error.
02414    */
02415 
02416 #define GS_AGENT_USAGE_STR "Usage: GS_agent [-l logfile] [-c] [-w httpd port] [-s config file]"
02417 
02418   while((c = getopt(argc,argv,"cl:w:s:")) != EOF) {
02419     switch(c) {
02420       case 'w':
02421         if(!strcmp(optarg, "disable"))
02422           *httpd_port = -1;
02423         else
02424           *httpd_port = atoi(optarg);
02425         break;
02426       case 's':
02427         *config_file = strdup(optarg);
02428         break;
02429       case 'l':
02430         *logfile = strdup(optarg);
02431         break;
02432       case 'c':
02433         *daemon = 0;
02434         break;
02435       case '?':
02436         return -1;
02437         break;
02438       default:
02439         ERRPRINTF("Bad arg: '%c'.\n",c);
02440         return -1;
02441     }
02442   }
02443 
02444   /* Setup default server config file name if necessary */
02445   if(!*config_file) {
02446     char *gridsolve_root;
02447 
02448     if((gridsolve_root = getenv("GRIDSOLVE_ROOT")) == NULL)
02449       gridsolve_root = GRIDSOLVE_TOP_BUILD_DIR;
02450     if(!gridsolve_root) {
02451       ERRPRINTF("Warning: GRIDSOLVE_ROOT unknown, assuming cwd.\n");
02452       gridsolve_root = strdup(".");
02453     }
02454 
02455     *config_file = dstring_sprintf("%s/agent_config", gridsolve_root);
02456     if(!*config_file) {
02457       fprintf(stderr,"Error generating agent config file name.\n");
02458       exit(EXIT_FAILURE);
02459     }
02460   } 
02461 
02462   return 0;
02463 }
02464 
02474 int
02475 gs_spawn_http_server(gs_agent_t *gs_agent, int httpd_port)
02476 {
02477   void **httpd_args;
02478   int *port;
02479   pid_t pid;
02480 
02481   httpd_args = (void **) malloc(2 * sizeof(void *));
02482   if(!httpd_args) {
02483     ERRPRINTF("Failed to allocate memory for child args\n");
02484     return -1;
02485   }
02486 
02487   port = (int *)malloc(sizeof(int));
02488   if(!port) {
02489     ERRPRINTF("Failed to allocate memory for port arg\n");
02490     return -1;
02491   }
02492 
02493   *port = httpd_port;
02494 
02495   httpd_args[0] = port;
02496   httpd_args[1] = gs_agent;
02497 
02498   /* fork http server process */
02499   pid = mfork(gs_agent_httpd, -1, httpd_args, NULL, NULL, NULL, 10);
02500 
02501   if(pid < 0) {
02502     fprintf(stderr,"Failed to fork http server.\n");
02503     return -1;
02504   }
02505 
02506   return 0;
02507 }
02508 
02520 int
02521 gs_spawn_server_expiration(gs_agent_t *gs_agent, int timeout)
02522 {
02523   extern void gs_server_expiration_pre(void**);
02524   extern void gs_server_expiration_post(void**);
02525   void **se_args;
02526   pid_t pid;
02527   int *tout;
02528 
02529   se_args = (void **) malloc(2 * sizeof(void *));
02530   if(!se_args) {
02531     ERRPRINTF("Failed to allocate memory for child args\n");
02532     return -1;
02533   }
02534 
02535   tout = (int *) malloc(sizeof(int));
02536   if(!tout) {
02537     ERRPRINTF("Failed to allocate memory for timeout arg\n");
02538     return -1;
02539   }
02540 
02541   *tout = timeout;
02542 
02543   se_args[0] = gs_agent;
02544   se_args[1] = tout;
02545   
02546   if(gs_signal(SIGHUP, gs_temp_sighup_handler) == SIG_ERR)
02547     ERRPRINTF("error setting up temp signal handler\n");
02548 
02549   pid = mfork(gs_server_expiration, GS_EXPIRE_FREQUENCY, se_args,
02550     gs_server_expiration_pre, NULL, gs_server_expiration_post, 30);
02551 
02552   if(gs_signal(SIGHUP, gs_agent_generic_signal_handler) == SIG_ERR)
02553     ERRPRINTF("error restoring signal handler\n");
02554 
02555   if(pid < 0) {
02556     ERRPRINTF("Failed to fork server expiration process.\n");
02557     return -1;
02558   }
02559 
02560   return 0;
02561 }
02562 
02570 int
02571 gs_spawn_sensor(gs_agent_t *gs_agent)
02572 {
02573   struct sockaddr_un sensoraddr;
02574   void **sens_args;
02575   pid_t pid;
02576   int err;
02577 
02578   sens_args = (void **) malloc(2 * sizeof(void *));
02579   if(!sens_args) {
02580     ERRPRINTF("Failed to allocate memory for child args\n");
02581     return -1;
02582   }
02583   sens_args[0] = NULL;
02584   sens_args[1] = gs_agent;
02585 
02586   /* fork sensor process  */
02587   pid = mfork(gs_agent_sensor_run, -1, sens_args,
02588     gs_agent_sensor_pre, gs_agent_sensor_post,
02589     gs_agent_sensor_exit, 30);
02590 
02591   if(pid < 0) {
02592     ERRPRINTF("Failed to fork sensor process.\n");
02593     return -1;
02594   }
02595 
02596   /* connect to sensor */
02597   GS_SENSORFD = socket(PF_UNIX, SOCK_STREAM, 0);
02598   if ( GS_SENSORFD < 0 )
02599   {
02600     ERRPRINTF("Could not open socket to sensor.\n");
02601     return -1;
02602   }
02603 
02604   memset(&sensoraddr, 0x0, sizeof(struct sockaddr_un));
02605   sensoraddr.sun_family = PF_UNIX;
02606   strcpy(sensoraddr.sun_path, GRIDSOLVE_SENSOR_USOCK);
02607 
02608   err = connect(GS_SENSORFD, (struct sockaddr*) &sensoraddr,
02609                 sizeof(struct sockaddr_un));
02610   if ( err < 0 )
02611   {
02612     ERRPRINTF("Warning: Connect on sensor socket failed.");
02613     ERRPRINTF("  logging events to stdout.\n");
02614     GS_SENSORFILE = stdout;
02615   }
02616   else {
02617     GS_SENSORFILE = fdopen(GS_SENSORFD, "w+");
02618     LOGPRINTF("Connected to sensor.\n");
02619   }
02620 
02621   return 0;
02622 }
02623 
02634 int
02635 main(int argc, char **argv)
02636 {
02637   gs_agent_t *gs_agent = NULL;
02638   int status = -1, daemon;
02639   int httpd_port;
02640   char *logfile, *gridsolve_root;
02641 
02642   gs_agent_scheduler_selection = GS_DEFAULT_MCT;
02643 
02644   gs_setup_signal_handlers(gs_agent_generic_signal_handler);
02645 
02646   if((gridsolve_root = getenv("GRIDSOLVE_ROOT")) == NULL) 
02647     gridsolve_root = GRIDSOLVE_TOP_BUILD_DIR;
02648 
02649   if(!gridsolve_root) {
02650     ERRPRINTF("Warning: GRIDSOLVE_ROOT unknown, assuming cwd.\n");
02651     gridsolve_root = strdup(".");
02652   }
02653 
02654   if(gs_agent_parse_cmd_line(argc, argv, &logfile, &daemon, &httpd_port,
02655        &agent_cfg) < 0) {
02656     fprintf(stderr, "%s\n", GS_AGENT_USAGE_STR);
02657     exit(EXIT_FAILURE);
02658   }
02659 
02660   if(!logfile) {
02661     logfile = dstring_sprintf("%s/gs_agent.log", gridsolve_root);
02662     if(!logfile) {
02663       ERRPRINTF("Error generating log file name.\n");
02664       exit(EXIT_FAILURE);
02665     }
02666   }
02667 
02668   if(daemon && gs_daemon_init(gridsolve_root, logfile) < 0) {
02669     fprintf(stderr, "Failed to start agent as a daemon.\n");
02670     exit(EXIT_FAILURE);
02671   }
02672 
02673   gs_agent = gs_agent_init(agent_cfg);
02674   if(!gs_agent) {
02675     ERRPRINTF("Failed to initialize agent\n");
02676     exit(EXIT_FAILURE);
02677   }
02678 
02679   if(gs_agent_scheduler_selection == GS_INVALID_SCHEDULER) {
02680     fprintf(stderr, "Invalid scheduler type specified.  Legal values are:\n");
02681     fprintf(stderr, "  default_mct -- original gridsolve scheduler\n");
02682     fprintf(stderr, "  htm_ml -- HTM minimum length\n");
02683     fprintf(stderr, "  htm_msf -- HTM minimum sumflow\n");
02684     fprintf(stderr, "  htm_hmct -- HTM historical minimum completion time\n");
02685     fprintf(stderr, "  htm_mp -- HTM minimum perturbation\n");
02686     exit(EXIT_FAILURE);
02687   }
02688 
02689   gs_log_init(argv[0], -1, -1, "agent");
02690 
02691   /* if using MySQL, no need to spawn the db manager (for sqlite) */
02692 #ifdef GS_USE_MYSQL
02693   if(gs_mysql_init_db(gs_agent) < 0) {
02694     printf("MySQL db could not be initialized\n");
02695     exit(-1);
02696   }
02697 #else
02698   if(gs_init_db(&global_db) < 0) {
02699     ERRPRINTF("SQLite could not be initialized\n");
02700     exit(EXIT_FAILURE);
02701   }
02702 #endif
02703 
02704   if(gs_spawn_sensor(gs_agent) < 0) {
02705     ERRPRINTF("Failed to spawn sensor\n");
02706     exit(EXIT_FAILURE);
02707   }
02708 
02709   if(gs_spawn_server_expiration(gs_agent, GS_EXPIRE_TIMEOUT) < 0) {
02710     ERRPRINTF("Failed to spawn server expiration process\n");
02711     exit(EXIT_FAILURE);
02712   }
02713 
02714   if(httpd_port >= 0)
02715     if(gs_spawn_http_server(gs_agent, httpd_port) < 0) {
02716       ERRPRINTF("Failed to spawn http server\n");
02717       exit(EXIT_FAILURE);
02718     }
02719 
02720   status = gs_agent_listen_and_process_messages(gs_agent);
02721 
02722   LOGPRINTF("Agent %s exiting with status %d\n", gs_agent->hostname, status);
02723 
02724   exit(status);
02725 }
02726 
02736 int
02737 gs_agent_init_conns(gs_agent_conn_t *conns)
02738 {
02739   int i;
02740 
02741   if(!conns) {
02742     ERRPRINTF("Invalid arg: null connections ptr\n");
02743     return -1;
02744   }
02745   
02746   for(i=0; i < GS_AGENT_MAX_FD; i++)
02747     conns->fd[i] = 0; 
02748   conns->maxfd = 0;
02749     
02750   return 0;
02751 }
02752 
02762 int
02763 gs_agent_add_conn(gs_agent_conn_t *conns, int fd)
02764 {
02765   if(!conns) {
02766     ERRPRINTF("Invalid arg: null connections ptr\n");
02767     return -1;
02768   } 
02769     
02770   if(fd > GS_AGENT_MAX_FD) 
02771     return -1;
02772 
02773   conns->fd[fd] = 1;
02774 
02775   if(conns->maxfd < fd)
02776     conns->maxfd = fd;
02777 
02778   return 0;
02779 }
02780 
02790 int
02791 gs_agent_del_conn(gs_agent_conn_t *conns, int fd)
02792 {
02793   int i;
02794 
02795   if(!conns) {
02796     ERRPRINTF("Invalid arg: null connections ptr\n");
02797     return -1;
02798   }
02799 
02800   conns->fd[fd] = 0;
02801 
02802   if(fd == conns->maxfd) {
02803     for(i=fd;i>=0;i--)
02804       if(conns->fd[i]) {
02805         conns->maxfd = i;
02806         break;
02807       }
02808 
02809     if(i < 0)
02810       conns->maxfd = 0;
02811   }
02812 
02813   return 0;
02814 }
02815 
02830 int
02831 gs_agent_setup_fd_sets(gs_agent_conn_t *connections, int listensock,
02832   int extrasock, fd_set *allset, int *maxfd)
02833 {
02834   int i;
02835 
02836   if(!connections || !allset || !maxfd) {
02837     ERRPRINTF("Invalid args\n");
02838     return -1;
02839   }
02840 
02841   *maxfd = extrasock;
02842   if(listensock > *maxfd)
02843     *maxfd = listensock;
02844 
02845   FD_ZERO(allset);
02846   if(extrasock >= 0)
02847     FD_SET(extrasock, allset);
02848   if(listensock >= 0)
02849     FD_SET(listensock, allset);
02850 
02851   for(i=0; i <= connections->maxfd; i++) {
02852     if(connections->fd[i]) {
02853       FD_SET(i, allset);
02854       if(i > *maxfd)
02855         *maxfd = i;
02856     }
02857   }
02858 
02859   return 0;
02860 }