service_template.c

Go to the documentation of this file.
00001 
00007 /* $Id: service_template.c,v 1.69 2008/08/28 01:04:54 tbrady Exp $ */
00008 /* $UTK_Copyright: $ */
00009 
00010 #include <stdio.h>
00011 #include <time.h>
00012 #include <string.h>
00013 #include <sys/types.h>
00014 #include <sys/stat.h>
00015 #include <sys/wait.h>
00016 #include <fcntl.h>
00017 #include <errno.h>
00018 #include <unistd.h>
00019 #include <sys/param.h>
00020 
00021 #include "utility.h"
00022 #include "problem.h"
00023 #include "comm_protocol.h"
00024 #include "comm_basics.h"
00025 #include "comm_data.h"
00026 #include "comm_encode.h"
00027 #include "gs_pm_model.h"
00028 
00029 /* The template for generating request IDs is defined here.  If you change
00030  * the format substantially, be aware that the code in
00031  * gs_parse_pid_from_requestid() that parses out the pid from the request ID 
00032  * may need to be changed.   Also the glob pattern in workload_report.c
00033  * will need to be changed.
00034  */
00035 #define REQUEST_ID_LEN 64
00036 #define REQUEST_ID_TEMPLATE "gsrequest_%s_%d_XXXXXXXXXXXX"
00037 
00038 /* this struct holds all the relevant information about the service to
00039  * be executed.
00040  */
00041 typedef struct {
00042   gs_problem_t *problem;
00043   char *gridsolve_root;
00044   char *gridsolve_arch;
00045   int blocking;
00046   int sock;
00047   int tag;
00048   int my_dsig;
00049   char *srv_cid;
00050   char srv_job_count[FN_LEN];
00051   char *cwd;
00052   char request_id[REQUEST_ID_LEN];
00053   char *agent;
00054   in_port_t agentport;
00055   char *cli_username;
00056   char *cli_hostname;
00057   char *cli_cid;
00058   int client_dsig;
00059   char *problem_name;
00060   gs_service_error_enum_t err;
00061   char *bmode;
00062   char *infodir;
00063   int agent_taskid;
00064   double agent_est_time;
00065 } gs_service_info_t;
00066 
00067 int 
00068   gs_read_server_from_file(char *, gs_server_t *),
00069   gs_service_read_coeff(gs_service_info_t *, gs_server_t *),
00070   gs_problem_service(gs_problem_t *),
00071   gs_service_blocking_request(gs_service_info_t *),
00072   gs_service_nonblocking_request(gs_service_info_t *),
00073   gs_service_batch_request(gs_service_info_t *);
00074 
00075 void 
00076   gs_dummy_signal_handler(int),
00077   gs_dummy_signal_handler(int),
00078   gs_service_sigterm_handler(int);
00079 
00080 double
00081   gs_read_service_et(char *),
00082   gs_pm_problem_service(gs_service_info_t *),
00083   gs_agent_get_server_score(gs_problem_t *, gs_server_t *);
00084 
00085 /* This holds the pid of the child that is forked to service the request.
00086  * when grpc_cancel() is called, the request id only has encoded the
00087  * pid of the first child, not the second.  So, for things to terminate
00088  * cleanly we need to keep track of and kill the second child also.
00089  */
00090 pid_t gs_service_pid = 0;
00091 
00136 int
00137 service_template(int argc, char *argv[])
00138 {
00139   gs_service_info_t sinfo;
00140   gs_server_t *server;
00141   char *service_xml;
00142   double est_time;
00143   char *cwd;
00144 
00145   sinfo.err = GS_SVC_ERR_UNSPECIFIED;
00146 
00147   if(argc != 17) {
00148     fprintf(stderr, "Bad usage.  Anyway, don't use this\n");
00149     fprintf(stderr, "from the command line.\n");
00150     exit(-1);
00151   }
00152 
00153   sinfo.problem_name = strdup(argv[1]);
00154   sinfo.tag = atoi(argv[2]);
00155   sinfo.client_dsig = atoi(argv[3]);
00156   sinfo.sock = atoi(argv[4]);
00157   sinfo.gridsolve_root = argv[5];
00158   sinfo.gridsolve_arch = argv[6];
00159   sinfo.blocking = atoi(argv[7]);
00160   sinfo.agent = strdup(argv[8]);
00161   sinfo.agentport = atoi(argv[9]);
00162   sinfo.srv_cid = strdup(argv[10]);
00163   sinfo.cli_username = strdup(argv[11]);
00164   sinfo.cli_hostname = strdup(argv[12]);
00165   sinfo.cli_cid = strdup(argv[13]);
00166   sinfo.infodir = strdup(argv[14]);
00167   sinfo.agent_taskid = atoi(argv[15]);
00168   sinfo.agent_est_time = atof(argv[16]);
00169 
00170   server = (gs_server_t *) malloc(sizeof(gs_server_t));
00171 
00172   if(!server) {
00173     gs_send_tag(sinfo.sock, GS_SVC_ERR_MALLOC);
00174     exit(-1);
00175   }
00176 
00177   if(gs_service_read_coeff(&sinfo, server) < 0) {
00178     free(server);
00179     server = NULL;
00180   }
00181 
00182   service_xml = dstring_sprintf("%s/service/%s/%s.xml", sinfo.gridsolve_root,
00183                                 sinfo.problem_name, sinfo.problem_name);
00184 
00185   if(!service_xml) {
00186     gs_send_tag(sinfo.sock, GS_SVC_ERR_MALLOC);
00187     exit(-1);
00188   }
00189 
00190   sinfo.problem = (gs_problem_t *) malloc(sizeof(gs_problem_t));
00191 
00192   if(!sinfo.problem) {
00193     gs_send_tag(sinfo.sock, GS_SVC_ERR_MALLOC);
00194     exit(-1);
00195   }
00196 
00197   snprintf(sinfo.srv_job_count, FN_LEN, "%s/%s.%d", sinfo.infodir,
00198     GS_SERVER_JOB_COUNT_FILE_PREFIX, getppid());
00199 
00200   /* Look for the service corresponding to the requested problem */
00201   if(gs_read_problem_from_file(service_xml, sinfo.problem) < 0) {
00202     ERRPRINTF("Error loading service: '%s'.\n", service_xml);
00203     gs_send_tag(sinfo.sock, GS_SVC_ERR_MISSING_XML);
00204     exit(-1);
00205   }
00206   else {
00207     sinfo.my_dsig = pvmgetdsig();
00208 
00209     cwd = CALLOC(MAXPATHLEN, sizeof(char));
00210     if (cwd == NULL) exit(-1);
00211     if (getcwd(cwd, MAXPATHLEN) == NULL) exit(-1);
00212     sinfo.cwd = strdup(cwd);
00213     FREE(cwd);
00214 
00215     if (!sinfo.cwd) {
00216       ERRPRINTF("Can't get current working directory.\n");
00217       gs_send_tag(sinfo.sock, GS_SVC_ERR_GETCWD);
00218       exit(-1);
00219     }
00220 
00221     sprintf(sinfo.request_id, REQUEST_ID_TEMPLATE, sinfo.srv_cid,
00222        (int) getpid());
00223 
00224     if(gs_create_request_id(sinfo.request_id) < 0) {
00225       ERRPRINTF("Error creating request id.\n");
00226       gs_send_tag(sinfo.sock, GS_SVC_ERR_REQID);
00227       exit(-1);
00228     }
00229 
00230     if(mkdir(sinfo.request_id, 0700) < 0) {
00231       ERRPRINTF("Could not create directory '%s' ", sinfo.request_id);
00232       ERRPRINTF("to store output (cwd = '%s')\n", sinfo.cwd);
00233       gs_send_tag(sinfo.sock, GS_SVC_ERR_MKDIR);
00234       exit(-1);
00235     }
00236 
00237     if(chdir(sinfo.request_id) < 0) {
00238       ERRPRINTF("Could not cd to request directory '%s'.\n", sinfo.request_id);
00239       gs_send_tag(sinfo.sock, GS_SVC_ERR_CHDIR);
00240       exit(-1);
00241     }
00242 
00243     if(gs_increment_job_count(sinfo.srv_job_count) < 0)
00244       ERRPRINTF("Warning: failed to increment job count.\n");
00245 
00246     if(gs_send_tag(sinfo.sock, GS_PROT_OK) < 0) {
00247       ERRPRINTF("Error sending GS_PROT_OK.\n");
00248       goto service_abnormal_exit;
00249     }
00250 
00251     if(gs_send_string(sinfo.sock, sinfo.request_id) < 0) {
00252       ERRPRINTF("Error sending request id.\n");
00253       goto service_abnormal_exit;
00254     }
00255 
00256     /* now, if this is an assigned server request, send the problem
00257        description back to the client. */
00258 
00259     if(sinfo.tag == GS_PROT_PROBLEM_SOLVE_ASSIGNED) {
00260       char *problemstring = NULL;
00261       char dsig_string[256];
00262 
00263       sprintf(dsig_string, "%d", sinfo.my_dsig);
00264 
00265       if(gs_send_string(sinfo.sock, dsig_string) < 0) {
00266         ERRPRINTF("Error sending server data signature.\n");
00267         goto service_abnormal_exit;
00268       }
00269 
00270       if(gs_encode_problem(&problemstring, sinfo.problem) < 0) {
00271         ERRPRINTF("Error encoding problem description.\n");
00272         goto service_abnormal_exit;
00273       }
00274 
00275       if(gs_send_string(sinfo.sock, problemstring) < 0) {
00276         ERRPRINTF("Error sending problem description.\n");
00277         goto service_abnormal_exit;
00278       }
00279     }
00280 
00281 
00282 #ifdef GS_SMART_GRIDSOLVE
00283     if(gs_recv_int(sinfo.sock, &sinfo.problem->has_smart_arg_comm) < 0) {
00284       ERRPRINTF("Error sending problem description.\n");
00285       goto service_abnormal_exit;
00286     }
00287 
00288     if(sinfo.problem->has_smart_arg_comm==1){
00289       if(gs_smart_recv_map_info(sinfo.sock, sinfo.problem)<0){
00290         ERRPRINTF("Error receiving remote comm info.\n");
00291         goto service_abnormal_exit;
00292       }
00293     }
00294 
00295     if(sinfo.problem->has_smart_arg_comm==1){
00296       if(gs_smart_recv_input_args(sinfo.sock, server, sinfo.problem, sinfo.client_dsig, sinfo.my_dsig)<0){         
00297         ERRPRINTF("SMART: Error receiving smart input args.\n");
00298         goto service_abnormal_exit;
00299       }
00300     }
00301     else{
00302       if(gs_recv_input_args(sinfo.sock, sinfo.problem, sinfo.client_dsig, sinfo.my_dsig) < 0) {
00303         ERRPRINTF("Error receiving input args.\n");
00304         goto service_abnormal_exit;
00305       }
00306 
00307     }
00308 
00309 #else
00310 
00311 
00312 
00313     if(gs_recv_input_args(sinfo.sock, sinfo.problem, sinfo.client_dsig, sinfo.my_dsig) < 0) {
00314       ERRPRINTF("Error receiving input args.\n");
00315       goto service_abnormal_exit;
00316     }
00317 #endif
00318 
00319     if(server)
00320       est_time = gs_agent_get_server_score(sinfo.problem, server);
00321     else
00322       est_time = 2000.0;
00323 
00324     if(gs_notify_agent_problem_solve(sinfo.agent, sinfo.agentport, 
00325          sinfo.problem_name, est_time, sinfo.srv_cid, sinfo.cli_username, 
00326          sinfo.cli_hostname, sinfo.cli_cid, sinfo.request_id, 
00327          sinfo.agent_taskid, sinfo.agent_est_time) < 0)
00328       ERRPRINTF("Warning: failed sending problem solve notification.\n");
00329 
00330     sinfo.bmode = gs_problem_getinfo(sinfo.problem, "BATCH_SUBMIT", NULL);
00331 
00332     if(sinfo.bmode) {
00333       if(gs_service_batch_request(&sinfo) < 0) {
00334         gs_send_tag(sinfo.sock, sinfo.err);
00335         goto service_abnormal_exit;
00336       }
00337     }
00338     else if(sinfo.blocking) {
00339       if(gs_service_blocking_request(&sinfo) < 0)
00340         goto service_abnormal_exit;
00341     }
00342     else {
00343       if(gs_service_nonblocking_request(&sinfo) < 0)
00344         goto service_abnormal_exit;
00345     }
00346   }
00347 
00348   gs_close_socket(sinfo.sock);
00349   exit(0);
00350 
00351 service_abnormal_exit:
00352   /* for non-blocking requests, create a file whose name contains
00353    * the error code so we'll remember why the service failed when
00354    * the client connects back to wait for it to complete.
00355    */
00356   if(!sinfo.blocking || sinfo.bmode) {
00357     if(gs_decrement_job_count(sinfo.srv_job_count) < 0)
00358       ERRPRINTF("Warning: failed to decrement job count.\n");
00359 
00360     if(gs_create_error_file(".", sinfo.err) < 0)
00361       ERRPRINTF("Could not create 'error' file.\n");
00362   }
00363 
00364   /* if something goes wrong, write a "cancelled" file to the
00365    * request subdirectory so that it'll get cleaned up.
00366    */
00367   if(gs_create_timestamp_file(".", "cancelled", 0.0) < 0)
00368     ERRPRINTF("Could not create 'cancelled' file.\n");
00369   ERRPRINTF("Service terminating abnormally\n");
00370   exit(-1);
00371 }
00372 
00382 void
00383 gs_dummy_signal_handler(int sig)
00384 {
00385   return;
00386 }
00387 
00396 void
00397 gs_service_sigterm_handler(int sig)
00398 {
00399   if(gs_service_pid > 0) {
00400     if(kill(gs_service_pid, sig) < 0)
00401       ERRPRINTF("Failed to kill service process [pid = %d]\n", gs_service_pid);
00402   }
00403 
00404   return;
00405 }
00406 
00416 void
00417 gs_batch_service_sigterm_handler(int sig)
00418 {
00419   if(gs_service_pid > 0) {
00420     if(kill(gs_service_pid, sig) < 0)
00421       ERRPRINTF("Failed to kill batch service process [pid = %d]\n", gs_service_pid);
00422   }
00423 
00424   return;
00425 }
00426 
00437 int
00438 gs_service_read_coeff(gs_service_info_t *s, gs_server_t *server)
00439 {
00440   char *server_xml, *service_coeff, *service_model;
00441   int mfd;
00442 
00443   server_xml = dstring_sprintf(GS_SERVER_XML_TEMPLATE, s->infodir);
00444 
00445   if(!server_xml) {
00446     s->err = GS_SVC_ERR_MALLOC;
00447     return -1;
00448   }
00449 
00450   if(gs_read_server_from_file(server_xml, server) < 0) {
00451     s->err = GS_SVC_ERR_MISSING_SV_XML;
00452     return -1;
00453   }
00454 
00455   server->workload = gs_get_workload();
00456 
00457   service_model = dstring_sprintf("%s/%s.mdl", s->infodir, s->problem_name);
00458 
00459   if(!service_model) {
00460     s->err = GS_SVC_ERR_MALLOC;
00461     return -1;
00462   }
00463 
00464   service_coeff = dstring_sprintf("%s/%s.coe", s->infodir, s->problem_name);
00465 
00466   if(!service_coeff) {
00467     s->err = GS_SVC_ERR_MALLOC;
00468     return -1;
00469   }
00470 
00471   mfd = open(service_model, O_RDONLY, 0600);
00472 
00473   if(mfd < 0) {
00474     server->perf_expr = strdup(GS_NO_MODEL_UPDATE);
00475     return 0;
00476   }
00477 
00478   /* note we're obtaining a lock on the model file, not the
00479    * coefficient file.  the model file is the one that will
00480    * be locked by the service processes when both files are
00481    * updated.
00482    */
00483 
00484   if(gs_lock_fd(mfd, F_RDLCK) < 0) {
00485     close(mfd);
00486     server->perf_expr = strdup(GS_NO_MODEL_UPDATE);
00487     return 0;
00488   }
00489 
00490   if(gs_get_contents_of_file(service_coeff, &(server->perf_expr)) < 0) {
00491     ERRPRINTF("Warning: failed to read coefficient file '%s'\n", service_coeff);
00492     gs_unlock_fd(mfd);
00493     close(mfd);
00494     server->perf_expr = strdup(GS_NO_MODEL_UPDATE);
00495     return 0;
00496   }
00497 
00498   if(server->perf_expr[strlen(server->perf_expr)-1] == '\n')
00499     server->perf_expr[strlen(server->perf_expr)-1] = 0;
00500 
00501   gs_unlock_fd(mfd);
00502   close(mfd);
00503 
00504   return 0;
00505 }
00506 
00515 int
00516 gs_service_blocking_request(gs_service_info_t *s)
00517 {
00518   double service_et;
00519 
00520   service_et = gs_pm_problem_service(s);
00521 
00522   if(gs_decrement_job_count(s->srv_job_count) < 0)
00523     ERRPRINTF("Warning: failed to decrement job count.\n");
00524 
00525   /* since the service might have changed the working directory
00526    * try to chdir back to the request subdirectory
00527    */
00528   if((chdir(s->cwd) < 0) || (chdir(s->request_id) < 0)) {
00529     char *origcwd, *newcwd;
00530 
00531     ERRPRINTF("Could not cd back to request directory '%s/%s'.\n",
00532       s->cwd, s->request_id);
00533     gs_send_tag(s->sock, GS_SVC_ERR_CHDIR);
00534 
00535     /* check whether working directory has changed.  if not,
00536      * goto service_abnormal_exit so that we can attempt to
00537      * write the cancelled file.  otherwise, just exit without
00538      * writing since we'd be writing it in the wrong location.
00539      */
00540     origcwd = dstring_sprintf("%s/%s", s->cwd, s->request_id);
00541     newcwd = getcwd(NULL, MAXPATHLEN);
00542 
00543     if(!strcmp(newcwd, origcwd))
00544       return -1;
00545 
00546     exit(-1);
00547   }
00548 
00549   if(gs_send_tag(s->sock, GS_PROT_OK) < 0) {
00550     ERRPRINTF("Error sending tag.\n");
00551     return -1;
00552   }
00553 
00554 
00555 
00556 #ifdef GS_SMART_GRIDSOLVE
00557   int pid;
00558     if(s->problem->has_smart_arg_comm==1){
00559       if(gs_smart_send_output_args_to_client(s->sock ,s->problem, s->my_dsig)<0){
00560         ERRPRINTF("SMART : Error sending smart sending arguments\n");
00561         return -1;
00562       }
00563       pid=fork();
00564       if(pid==-1){
00565         ERRPRINTF("SMART: Out of memory could not fork\n");
00566         return -1;
00567       }
00568 
00569       if(pid==0){ 
00570        gs_server_t * src_server = (gs_server_t *)calloc(1,sizeof(gs_server_t));
00571        if(gs_service_read_coeff(s, src_server) < 0) {
00572          free(src_server);
00573          src_server = NULL;
00574        }
00575 
00576         if(gs_smart_send_output_args_remotely(s->sock ,src_server, s->problem, s->my_dsig)<0){
00577           ERRPRINTF("SMART : Error sending smart sending arguments\n");
00578           return -1;
00579         }
00580         _exit(0);
00581       }
00582     }
00583     else{
00584       if(gs_send_output_args(s->sock, s->problem, s->my_dsig) < 0) {
00585         ERRPRINTF("Error sending output args.\n");
00586         return -1;
00587       }
00588     }
00589 #else
00590   if(gs_send_output_args(s->sock, s->problem, s->my_dsig) < 0) {
00591     ERRPRINTF("Error sending output args.\n");
00592     return -1;
00593   }
00594 #endif
00595 
00596 
00597 
00598   if(gs_notify_agent_problem_complete(s->agent, s->agentport, s->problem_name,
00599        s->srv_cid, s->cli_username, s->cli_hostname, s->cli_cid, s->request_id,
00600        s->agent_taskid, service_et) < 0)
00601     ERRPRINTF("Warning: failed sending problem solve notification.\n");
00602 
00603   if(gs_create_timestamp_file(".", "retrieved", 0.0))
00604     ERRPRINTF("Warning: failed to create 'retrieved' file.\n");
00605 
00606   return 0;
00607 }
00608 
00617 int
00618 gs_service_nonblocking_request(gs_service_info_t *s)
00619 {
00620   char *problemstr = NULL;
00621   FILE *xmlfile;
00622   double service_et;
00623   pid_t pid;
00624   int fd;
00625 
00626   gs_service_pid = 0;
00627 
00628   /* make sure SIGCHLD is caught so that it is not delivered to 
00629    * the mfork library.  I tried ignoring it (SIG_IGN) but then
00630    * waitpid() failed on some systems.
00631    */
00632 
00633   if((gs_signal(SIGCHLD, gs_dummy_signal_handler) == SIG_ERR) ||
00634      (gs_signal(SIGTERM, gs_service_sigterm_handler) == SIG_ERR)) {
00635     ERRPRINTF("Error: could not set signal handlers\n");
00636     s->err = GS_SVC_ERR_SIGNALS;
00637     return -1;
00638   }
00639 
00640   /* fork a child process to execute the service */
00641 
00642   pid = fork();
00643 
00644   if(pid == -1) {
00645     ERRPRINTF("Failed to fork\n");
00646     s->err = GS_SVC_ERR_FORK;
00647     return -1;
00648   }
00649 
00650   if(pid == 0) {
00651     /* this is the child.  execute the service and save the results. */
00652 
00653     setbuf(stdout, NULL);
00654     setbuf(stderr, NULL);
00655 
00656     service_et = gs_pm_problem_service(s);
00657 
00658     if((chdir(s->cwd) < 0) || (chdir(s->request_id) < 0)) {
00659       ERRPRINTF("Could not cd back to request directory '%s/%s'.\n", 
00660         s->cwd, s->request_id);
00661       _exit(GS_SVC_ERR_CHDIR);
00662     }
00663 
00664     xmlfile = fopen("problem.xml", "w");
00665 
00666     if(!xmlfile) {
00667       ERRPRINTF("Could not create xml file.\n");
00668       _exit(GS_SVC_ERR_CREATE_XML);
00669     }
00670 
00671     if(gs_encode_problem(&problemstr, s->problem) < 0) {
00672       ERRPRINTF("Could not encode problem.\n");
00673       _exit(GS_SVC_ERR_PROBLEM_ENC);
00674     }
00675 
00676     fprintf(xmlfile, "%s\n", problemstr);
00677 
00678     fclose(xmlfile);
00679 
00680     fd = open("data", O_WRONLY | O_CREAT, 0600);
00681 
00682     if(fd < 0) {
00683       ERRPRINTF("Could not create data file.\n");
00684       _exit(GS_SVC_ERR_CREAT_DATA_FILE);
00685     }
00686 
00687 
00688 #ifdef GS_SMART_GRIDSOLVE
00689   if(s->problem->has_smart_arg_comm==1){
00690    gs_server_t * src_server = (gs_server_t *)calloc(1,sizeof(gs_server_t));
00691    if(gs_service_read_coeff(s, src_server) < 0) {
00692      free(src_server);
00693      src_server = NULL;
00694     }
00695     if(gs_smart_save_output_args_to_file(s->sock, src_server, fd, s->problem, s->my_dsig) < 0) {
00696       ERRPRINTF("Error sending output args.\n");
00697       _exit(GS_SVC_ERR_IO);
00698     }
00699     
00700   }
00701   else{
00702     if(gs_save_output_args_to_file(fd, s->problem, s->my_dsig) < 0) {
00703       ERRPRINTF("Error sending output args.\n");
00704       _exit(GS_SVC_ERR_IO);
00705     }
00706   }
00707 
00708 #else
00709     if(gs_save_output_args_to_file(fd, s->problem, s->my_dsig) < 0) {
00710       ERRPRINTF("Error sending output args.\n");
00711       _exit(GS_SVC_ERR_IO);
00712     }
00713 
00714 #endif
00715 
00716     close(fd);
00717 
00718     if(gs_create_timestamp_file(".", "done", service_et) < 0) {
00719       ERRPRINTF("Could not create completion file.\n");
00720       _exit(GS_SVC_ERR_COMPLETION_FILE);
00721     }
00722 
00723     _exit(0);
00724   }
00725   else {
00726     pid_t child;
00727     int cstat_loc, status;
00728 
00729     gs_service_pid = pid;
00730 
00731     /* this is the parent.  wait for the child (service) to complete
00732      * and check its status to determine if it was successful or not.
00733      */
00734 
00735     child = waitpid(pid, &cstat_loc, 0);
00736 
00737     if(child < 0) {
00738       ERRPRINTF("Error waiting for service process %d.\n", (int)pid);
00739       s->err = GS_SVC_ERR_WAITPID;
00740       return -1;
00741     }
00742 
00743     if(WIFEXITED(cstat_loc) == 0) {
00744       ERRPRINTF("service process %d did not terminate.\n", (int)pid);
00745       s->err = GS_SVC_ERR_ABNORMAL_EXIT;
00746       return -1;
00747     }
00748 
00749     status = WEXITSTATUS(cstat_loc);
00750 
00751     if(status != 0) {
00752       ERRPRINTF("service process %d terminated abnormally (status %d).\n",
00753         (int)pid, (char)status);
00754       s->err = (char)status > 0 ? (char)status : GS_SVC_ERR_UNSPECIFIED;
00755       return -1;
00756     }
00757 
00758     service_et = gs_read_service_et("done");
00759 
00760     if(gs_decrement_job_count(s->srv_job_count) < 0)
00761       ERRPRINTF("Warning: failed to decrement job count.\n");
00762 
00763     if(gs_notify_agent_problem_complete(s->agent, s->agentport, s->problem_name,
00764          s->srv_cid, s->cli_username, s->cli_hostname, s->cli_cid, s->request_id,
00765          s->agent_taskid, service_et) < 0)
00766       ERRPRINTF("Warning: failed sending problem solve notification.\n");
00767   }
00768 
00769   return 0;
00770 }
00771 
00781 double
00782 gs_read_service_et(char *file)
00783 {
00784   double service_et;
00785   FILE *dfile;
00786 
00787   service_et = 0.0;
00788   dfile = fopen(file, "r");
00789   if(dfile) {
00790     char buf[128];
00791     fgets(buf, 128, dfile); /* skip timestamp */
00792     if(fgets(buf, 128, dfile))
00793       service_et = atof(buf);
00794 
00795     fclose(dfile);
00796   }
00797 
00798   return service_et;
00799 }
00800 
00807 int
00808 gs_exec_batch_service(gs_service_info_t *s)
00809 {
00810   int status;
00811   char *cmd, *orig_exe, *new_exe;
00812 
00813   unlink("gs_batch_id");
00814 
00815 #ifdef __CYGWIN__
00816   cmd = dstring_sprintf("%s/service/%s/gs_submit %s/service/%s/%s_batch_service > gs_batch_id", 
00817             s->gridsolve_root, s->problem_name, s->gridsolve_root, s->problem_name, s->problem_name);
00818 #else
00819   new_exe = dstring_sprintf("%s/%s/%s_batch_service", s->cwd, 
00820     s->request_id, s->problem_name);
00821   orig_exe = dstring_sprintf("%s/service/%s/%s_batch_service", 
00822     s->gridsolve_root, s->problem_name, s->problem_name);
00823 
00824   if(symlink(orig_exe, new_exe) < 0) {
00825     ERRPRINTF("failed to create symlink (%s -> %s)\n",
00826       new_exe, orig_exe);
00827     return -1;
00828   }
00829   cmd = dstring_sprintf("%s/service/%s/gs_submit %s > gs_batch_id", 
00830     s->gridsolve_root, s->problem_name, new_exe);
00831 #endif
00832 
00833   if(!cmd) {
00834     ERRPRINTF("failed to create command string\n");
00835     return -1;
00836   }
00837 
00838   DBGPRINTF("cmd: %s\n", cmd);
00839   status = system(cmd);
00840 
00841   if((status < 0) || (WEXITSTATUS(status) != 0)) {
00842     ERRPRINTF("command failed: '%s'\n", cmd);
00843     return -1;
00844   }
00845 
00846   return 0;
00847 }
00848 
00853 int
00854 gs_wait_for_batch_job_completion(gs_service_info_t *s)
00855 {
00856   char buf[256], *cmd;
00857   int status;
00858   FILE *f;
00859 
00860   /* just in case the submit script changed the current directory,
00861    * change it back to the request dir.
00862    */
00863   if((chdir(s->cwd) < 0) || (chdir(s->request_id) < 0)) {
00864     ERRPRINTF("can't cd back to %s/%s\n", s->cwd, s->request_id);
00865     return -1;
00866   }
00867 
00868   if((f = fopen("gs_batch_id", "r")) == NULL) {
00869     ERRPRINTF("failed to open file gs_batch_id\n");
00870     return -1;
00871   }
00872 
00873   if(!fgets(buf, 256, f)) {
00874     ERRPRINTF("failed to read ID from file gs_batch_id\n");
00875     return -1;
00876   }
00877 
00878   fclose(f);
00879 
00880   buf[strlen(buf)-1] = '\0';
00881 
00882   cmd = dstring_sprintf("%s/service/%s/gs_probe %s", s->gridsolve_root, s->problem_name, buf);
00883 
00884   if(!cmd) {
00885     ERRPRINTF("malloc");
00886     return -1;
00887   }
00888  
00889   for(;;) {
00890     status = system(cmd);
00891 
00892     if(status < 0) {
00893       ERRPRINTF("command failed: '%s'\n", cmd);
00894       return -1;
00895     }
00896 
00897     if(WEXITSTATUS(status) != 0)
00898       break;
00899 
00900     sleep(5);
00901   }
00902 
00903   return 0;
00904 }
00905 
00914 int
00915 gs_service_batch_request(gs_service_info_t *s)
00916 {
00917   char *problemstr = NULL;
00918   double service_et;
00919   FILE *xmlfile;
00920   pid_t pid;
00921 
00922   gs_service_pid = 0;
00923 
00924   if((gs_signal(SIGCHLD, gs_dummy_signal_handler) == SIG_ERR) ||
00925      (gs_signal(SIGTERM, gs_batch_service_sigterm_handler) == SIG_ERR)) {
00926     ERRPRINTF("Error: could not ignore SIGCHLD\n");
00927     s->err = GS_SVC_ERR_SIGNALS;
00928     return -1;
00929   }
00930 
00931   /* first save the problem struct to a file */
00932   xmlfile = fopen(GS_BATCH_XML, "w");
00933   
00934   if(!xmlfile) {
00935     ERRPRINTF("Could not create xml file.\n");
00936     s->err = GS_SVC_ERR_CREATE_XML;
00937     return -1;
00938   }
00939   
00940   if(gs_encode_problem(&problemstr, s->problem) < 0) {
00941     ERRPRINTF("Could not encode problem.\n");
00942     s->err = GS_SVC_ERR_PROBLEM_ENC;
00943     return -1;
00944   }
00945    
00946   fprintf(xmlfile, "%s\n", problemstr);
00947 
00948   fclose(xmlfile);
00949     
00950   /* then save the args */
00951 
00952   if(gs_save_input_args_to_file("input", s->problem, s->my_dsig, GS_CALL_FROM_C, 
00953        s->problem->major) < 0) {
00954     ERRPRINTF("Error saving input args.\n");
00955     s->err = GS_SVC_ERR_CREAT_DATA_FILE;
00956     return -1;
00957   }
00958 
00959   /* fork a child process to execute the batch service */
00960   pid = fork();
00961 
00962   if(pid == -1) {
00963     ERRPRINTF("Failed to fork\n");
00964     s->err = GS_SVC_ERR_FORK;
00965     return -1;
00966   }
00967 
00968   if(pid == 0) {
00969     if(gs_exec_batch_service(s) < 0)
00970       _exit(s->err);
00971 
00972     _exit(0);
00973   }
00974   else {
00975     int cstat_loc, status;
00976     pid_t child;
00977 
00978     gs_service_pid = pid;
00979 
00980     /* this is the parent.  */
00981 
00982     child = waitpid(pid, &cstat_loc, 0);
00983 
00984     if(child < 0) {
00985       ERRPRINTF("Error waiting for batch service process %d.\n", (int)pid);
00986       s->err = GS_SVC_ERR_WAITPID;
00987       return -1;
00988     }
00989 
00990     if(WIFEXITED(cstat_loc) == 0) {
00991       ERRPRINTF("batch service process %d did not terminate.\n", (int)pid);
00992       s->err = GS_SVC_ERR_ABNORMAL_EXIT;
00993       return -1;
00994     }
00995 
00996     status = WEXITSTATUS(cstat_loc);
00997 
00998     if(status != 0) {
00999       ERRPRINTF("batch service process %d terminated abnormally (status %d).\n",
01000         (int)pid, (char)status);
01001       s->err = (char)status > 0 ? (char)status : GS_SVC_ERR_UNSPECIFIED;
01002       return -1;
01003     }
01004 
01005     if(gs_wait_for_batch_job_completion(s) < 0) {
01006       ERRPRINTF("Failed to wait for job completion.\n");
01007       s->err = GS_SVC_ERR_WAITPID;
01008       return -1;
01009     }
01010 
01011     if(s->blocking) {
01012       char filename[5];
01013       int fd;
01014 
01015       sprintf(filename, "data");
01016       if((fd = open(filename, O_RDONLY)) == -1) {
01017         ERRPRINTF("failed to open output data\n");
01018         s->err = GS_SVC_ERR_OPEN_DATA_FILE;
01019         return -1;
01020       }
01021 
01022       if(gs_restore_output_args_from_file(fd, s->problem, s->my_dsig) < 0) {
01023         ERRPRINTF("failed to restore output data from disk\n");
01024         close(fd);
01025         s->err = GS_SVC_ERR_RESTORE_ARGS;
01026         return -1;
01027       }
01028 
01029       close(fd);
01030 
01031       if(gs_send_tag(s->sock, GS_PROT_OK) < 0) {
01032         ERRPRINTF("Error sending tag.\n");
01033         s->err = GS_SVC_ERR_IO;
01034         return -1;
01035       }
01036 
01037       if(gs_send_output_args(s->sock, s->problem, s->my_dsig) < 0) {
01038         ERRPRINTF("Error sending output args.\n");
01039         s->err = GS_SVC_ERR_IO;
01040         return -1;
01041       }
01042 
01043       if(gs_create_timestamp_file(".", "retrieved", 0.0))
01044         ERRPRINTF("Warning: failed to create 'retrieved' file.\n");
01045     }
01046 
01047     service_et = gs_read_service_et("done");
01048 
01049     if(gs_decrement_job_count(s->srv_job_count) < 0)
01050       ERRPRINTF("Warning: failed to decrement job count.\n");
01051 
01052     if(gs_notify_agent_problem_complete(s->agent, s->agentport, s->problem_name,
01053          s->srv_cid, s->cli_username, s->cli_hostname, s->cli_cid, s->request_id,
01054          s->agent_taskid, service_et) < 0)
01055       ERRPRINTF("Warning: failed sending problem solve notification.\n");
01056   }
01057 
01058   return 0;
01059 }
01060 
01061 
01062 int
01063 gs_get_category_names(gs_pm_model_t *model, gs_problem_t *prob, char ***arr)
01064 {
01065   gs_argument_t *argptr;
01066   char **cat_names;
01067   int i;
01068 
01069   cat_names = (char **)malloc(model->nb_categories * sizeof(char *));
01070 
01071   if(!cat_names)
01072     return -1;
01073 
01074   i = 0;
01075   for(argptr=prob->arglist; argptr != NULL; argptr=argptr->next) {
01076     if(argptr->arg_enum) {
01077       cat_names[i] = argptr->name;
01078       i++;
01079     }
01080   }
01081 
01082   *arr = cat_names;
01083 
01084   return 0;
01085 }
01086 
01087 int
01088 gs_get_param_exprs(gs_pm_model_t *model, char *comp_model, char ***arr)
01089 {
01090   char *cm_copy, *cp, **pexp;
01091   int i;
01092   
01093   cm_copy = strdup(comp_model);
01094   pexp = (char **)malloc(model->nb_params * sizeof(char *));
01095         
01096   if(!cm_copy || !pexp) {
01097     if(cm_copy) free(cm_copy);
01098     if(pexp) free(pexp);
01099     return -1;
01100   }
01101         
01102   cp = cm_copy;
01103   i = 0;
01104 
01105   while(cp) {
01106     pexp[i] = cp;
01107     i++;
01108     cp = strchr(cp, ';');
01109 
01110     if(cp) {
01111       *cp = 0;
01112       cp++;
01113     }
01114   }
01115 
01116   *arr = pexp;
01117 
01118   return 0;
01119 }
01120 
01121 int
01122 gs_gen_expr(int i, int numrows, char **cat_names, char **param_expr, 
01123    double **cat_mat, double **coef_mat, gs_pm_model_t *model, FILE *cf)
01124 {
01125   int j;
01126 
01127   if(i == numrows) {
01128     fprintf(cf, "-1");
01129     return 0;
01130   }
01131 
01132   fprintf(cf, "(");
01133   for(j=0;j<model->nb_categories;j++) {
01134     fprintf(cf, "(%s == %g)", cat_names[j], cat_mat[i][j]);
01135 
01136     if(j<model->nb_categories-1)
01137       fprintf(cf, " && ");
01138   }
01139   fprintf(cf, ")");
01140 
01141   fprintf(cf, "?");
01142 
01143   fprintf(cf, "(");
01144   for(j=0;j<model->nb_params;j++) {
01145     fprintf(cf, " (%g * (%s)) ", coef_mat[i][j], param_expr[j]);
01146 
01147     if(j<model->nb_params-1)
01148       fprintf(cf, " + ");
01149   }
01150   fprintf(cf, ")");
01151   fprintf(cf, ":");
01152   fprintf(cf, "(");
01153   gs_gen_expr(i+1, numrows, cat_names, param_expr, cat_mat, coef_mat, model, cf);
01154   fprintf(cf, ")");
01155 
01156   return 0;
01157 }
01158 
01159 int
01160 gs_generate_pm_expr(gs_pm_model_t *model, char *comp_model, gs_problem_t *prob, FILE *cf)
01161 {
01162   char **cat_names, **param_expr;
01163   double **cat_mat, **coef_mat;
01164   int numrows;
01165   
01166   numrows = gs_pm_all_models(model, &cat_mat, &coef_mat);
01167 
01168   if(gs_get_category_names(model, prob, &cat_names) < 0) {
01169     ERRPRINTF("Error getting category names\n");
01170     return -1;
01171   }
01172 
01173   if(gs_get_param_exprs(model, comp_model, &param_expr) < 0) {
01174     ERRPRINTF("Error getting category names\n");
01175     if(cat_names)
01176       free(cat_names);
01177     return -1;
01178   }
01179 
01180   if(numrows > 0) {
01181     gs_gen_expr(0, numrows, cat_names, param_expr, cat_mat, coef_mat, model, cf);
01182     fprintf(cf, "\n");
01183   }
01184 
01185   free(cat_names);
01186   free(param_expr);
01187 
01188   return 0;
01189 }
01190 
01201 int
01202 gs_update_perf_model(gs_service_info_t *s, char *model_fname, char *coef_fname,
01203     double elapsed_time)
01204 {
01205   int i, new_model, num_expr, fd;
01206   char *comp_model, *cm_copy, *tok;
01207   gs_arg_enum_t *arg_enum = NULL;
01208   gs_argument_t *argptr;
01209   gs_pm_model_t *model;
01210   struct stat stbuf;
01211   icl_hash_t *symtab;
01212   FILE *coef_file;
01213   double j;
01214 
01215   model = NULL;
01216 
01217   comp_model = gs_problem_getinfo(s->problem, "COMPLEXITY_MODEL", NULL);
01218 
01219   if(!comp_model)
01220     return 0;
01221 
01222   new_model = stat(model_fname, &stbuf) < 0;
01223 
01224   if((fd = gs_open_locked_file(model_fname, F_WRLCK, O_RDWR | O_CREAT)) < 0) {
01225     ERRPRINTF("Warning: failed to open perf model file '%s'.\n", model_fname);
01226     return -1;
01227   }
01228 
01229   if(new_model) {
01230     int num_categories = 0;
01231 
01232     /* model does not exist yet, so create one now */
01233 
01234     num_expr = 1;
01235     for(i=0;i<strlen(comp_model);i++)
01236       if(comp_model[i] == ';')
01237         num_expr++;
01238 
01239     for(argptr=s->problem->arglist; argptr != NULL; argptr=argptr->next)
01240       if(argptr->arg_enum)
01241         num_categories++;
01242 
01243     model = gs_pm_init_model(num_categories, num_expr, GS_PM_MAX_RUNS);
01244   }
01245   else {
01246     /* model already exists, so load from disk */
01247 
01248     model = gs_pm_load(fd);
01249   }
01250 
01251   if(!model) {
01252     ERRPRINTF("Failed to intialize model\n");
01253     gs_unlock_fd(fd);
01254     close(fd);
01255     return -1;
01256   }
01257 
01258   if(gs_construct_scalar_hashtable(&symtab, s->problem, GS_IN) < 0) {
01259     ERRPRINTF("Failed to construct hash table for scalars\n");
01260     gs_unlock_fd(fd);
01261     close(fd);
01262     return -1;
01263   }
01264 
01265   /* dup since strtok will clobber original */
01266   cm_copy = strdup(comp_model);
01267 
01268   if(!cm_copy) {
01269     ERRPRINTF("strdup failed\n");
01270     icl_hash_destroy(symtab, NULL, NULL);
01271     gs_unlock_fd(fd);
01272     close(fd);
01273     return -1;
01274   }
01275 
01276   for(i=0, tok=NULL; (tok = strtok(tok ? NULL : cm_copy, ";")); i++) {
01277     if(gs_expr_d(tok, &(model->params[i]), symtab) < 0)
01278       ERRPRINTF("Warning: failed to evaluate model expression '%s'\n", tok);
01279   }
01280 
01281   i = 0;
01282   for(argptr=s->problem->arglist; argptr != NULL; argptr=argptr->next) {
01283 
01284     j = 0.0;
01285 
01286     if(argptr->arg_enum) {
01287       int found_enum_match = 0;
01288 
01289       for(arg_enum=argptr->arg_enum; arg_enum != NULL; arg_enum=arg_enum->next) {
01290         if((strcmp(arg_enum->val, "other") == 0) ||
01291            ((argptr->datatype == GS_CHAR) && !strncmp(argptr->data, arg_enum->val, 1)) ||
01292            ((argptr->datatype != GS_CHAR) && (argptr->expr_val == atof(arg_enum->val))))
01293         {
01294           found_enum_match = 1;
01295           model->categories[i] = j;
01296           break;
01297         }
01298 
01299         j += 1.0;
01300       }
01301 
01302       if(!found_enum_match) {
01303         ERRPRINTF("No match in model for arg %s\n", argptr->name);
01304         icl_hash_destroy(symtab, NULL, NULL);
01305         gs_unlock_fd(fd);
01306         close(fd);
01307         return -1;
01308       }
01309 
01310       i++;
01311     }
01312   }
01313 
01314   gs_pm_store_timing(elapsed_time, model);
01315 
01316   lseek(fd, 0, SEEK_SET);
01317 
01318   /* write model to disk.. */
01319   if(gs_pm_save(model, fd) < 0) {
01320     ERRPRINTF("Failed to save model to disk\n");
01321     icl_hash_destroy(symtab, NULL, NULL);
01322     gs_unlock_fd(fd);
01323     close(fd);
01324     return -1;
01325   }
01326 
01327   coef_file = fopen(coef_fname, "w");
01328 
01329   if(coef_file) {
01330     gs_generate_pm_expr(model, comp_model, s->problem, coef_file);
01331     fclose(coef_file);
01332   }
01333 
01334   gs_pm_free_model(model);
01335   icl_hash_destroy(symtab, NULL, NULL);
01336   gs_unlock_fd(fd);
01337   close(fd);
01338 
01339   return 0;
01340 }
01341 
01349 double
01350 gs_pm_problem_service(gs_service_info_t *s)
01351 {
01352   double start_time, elapsed_time;
01353 
01354   start_time = usertime();
01355   gs_problem_service(s->problem);
01356   elapsed_time = usertime() - start_time;
01357 
01358 #ifdef GS_PM_DISABLE
01359   if(strcmp(s->infodir, "-") != 0) {
01360     /* don't bother adding entries where the elapsed time is zero */
01361 
01362     if(elapsed_time > 0.0) {
01363       char *model_fname, *coef_fname;
01364 
01365       model_fname = dstring_sprintf("%s/%s.mdl", s->infodir, s->problem->name);
01366       if(!model_fname)
01367         return -1.0;
01368 
01369       coef_fname = dstring_sprintf("%s/%s.coe", s->infodir, s->problem->name);
01370       if(!coef_fname) {
01371         free(model_fname);
01372         return -1.0;
01373       }
01374 
01375       gs_update_perf_model(s, model_fname, coef_fname, elapsed_time);
01376 
01377       free(model_fname);
01378       free(coef_fname);
01379     }
01380   }
01381 #endif
01382 
01383   return elapsed_time;
01384 }