server_util.c

Go to the documentation of this file.
00001 
00008 /* $Id: server_util.c,v 1.11 2008/03/24 19:53:11 seymour Exp $ */
00009 /* $UTK_Copyright: $ */
00010 
00011 
00012 #include "portability.h"
00013 #include "server.h"
00014 #include "general.h"
00015 #include "comm_data.h"
00016 #include "comm_basics.h"
00017 #include "comm_encode.h"
00018 #include "comm_protocol.h"
00019 
00020 #include <errno.h>
00021 
00035 int
00036 gs_server_free(gs_server_t *server)
00037 {
00038   if(!server) return -1;
00039 
00040   if(server->hostname) free(server->hostname);
00041   if(server->arch) free(server->arch);
00042   gs_free_problem(server->problemlist);
00043   if(server->agenthost) free(server->agenthost);
00044   if(server->sa_list) gs_free_infolist(server->sa_list);
00045   if(server->restrictions) gs_free_restrictions(server->restrictions);
00046   if(server->perf_expr) free(server->perf_expr);
00047   free(server);
00048 
00049   return 0;
00050 }
00051 
00052 
00064 int
00065 gs_do_ping(gs_server_t *srv, char *msg, int len, double *et)
00066 {
00067   double start_time, elapsed_time;
00068   int rv, tag, sock;
00069 
00070   elapsed_time = 0.0;
00071   rv = -1;
00072 
00073   sock = gs_connect_to_host(srv->componentid, srv->ipaddress, srv->port,
00074                             srv->proxyip, srv->proxyport);
00075 
00076   if(sock == INVALID_SOCKET) {
00077     ERRPRINTF("invalid socket\n");
00078     return -1;
00079   }
00080 
00081   if(gs_send_tag(sock, GS_PROT_PING) < 0) {
00082     ERRPRINTF("failed to send tag\n");
00083     goto ping_err;
00084   }
00085 
00086   if(gs_send_string(sock, VERSION) < 0) {
00087     ERRPRINTF("failed to send version string %s\n", VERSION);
00088     goto ping_err;
00089   }
00090 
00091   if(gs_recv_tag(sock, &tag) < 0) {
00092     ERRPRINTF("failed to recv response tag\n");
00093     goto ping_err;
00094   }
00095 
00096   if(tag != GS_PROT_OK) {
00097     ERRPRINTF("response tag != GS_PROT_OK\n");
00098     goto ping_err;
00099   }
00100 
00101   if(gs_send_int(sock, len) < 0) {
00102     ERRPRINTF("failed to send length of data\n");
00103     goto ping_err;
00104   }
00105 
00106   start_time = walltime();
00107 
00108   if(gs_twrite(sock, msg, len) <= 0) {
00109     ERRPRINTF("error sending ping data\n");
00110     goto ping_err;
00111   }
00112 
00113   if(gs_tread(sock, msg, len) <= 0) {
00114     ERRPRINTF("error recving ping data\n");
00115     goto ping_err;
00116   }
00117 
00118   elapsed_time = walltime() - start_time;
00119 
00120   /* set return val to success and fall through */
00121   rv = 0;
00122 
00123 ping_err:
00124 
00125   *et = elapsed_time;
00126   gs_close_socket(sock);
00127 
00128   return rv;
00129 }
00130 
00131 
00132 
00141 int
00142 gs_free_restrictions(gs_restriction_t *rlist)
00143 {
00144   gs_restriction_t *rp, *tmp;
00145 
00146   rp = rlist;
00147 
00148   while(rp) {
00149     tmp = rp->next;
00150     free(rp);
00151     rp = tmp;
00152   }
00153 
00154   return 0;
00155 }
00156 
00157 
00158 
00159 
00160 #ifndef WIN32
00161 
00168 int
00169 gs_clean_up_old_temp_files(char *infodir)
00170 {
00171   char sqlite_db_pattern[FN_LEN], stats_file_pattern[FN_LEN],
00172      sensor_usock_pattern[FN_LEN], jobcount_pattern[FN_LEN],
00173      lock_file_pattern[FN_LEN], journal_pattern[FN_LEN], *prefix;
00174   int i, globflags = GLOB_NOCHECK | GLOB_NOSORT;
00175   glob_t *pglob;
00176 
00177   pglob = (glob_t *) malloc(sizeof(glob_t));
00178 
00179   if(!pglob) return -1;
00180 
00181   prefix = infodir ? infodir : GS_INFODIR_PATH;
00182 
00183   sprintf(sqlite_db_pattern, "%s/%s.[0-9]*", prefix, GRIDSOLVE_SQLITE_DB_PREFIX);
00184   sprintf(journal_pattern, "%s/%s.[0-9]*-journal", prefix, GRIDSOLVE_SQLITE_DB_PREFIX);
00185   sprintf(stats_file_pattern, "%s/%s.[0-9]*", prefix, GRIDSOLVE_STATS_FILE_PREFIX);
00186   sprintf(jobcount_pattern, "%s/%s.[0-9]*", prefix, GS_SERVER_JOB_COUNT_FILE_PREFIX);
00187   sprintf(sensor_usock_pattern, "%s/%s.[0-9]*", prefix, GRIDSOLVE_SENSOR_USOCK_PREFIX);
00188   sprintf(lock_file_pattern, "%s/%s.[0-9]*", prefix, GRIDSOLVE_SCHED_LOCK_FILE_PREFIX);
00189 
00190   if(glob(sqlite_db_pattern, globflags, NULL, pglob) ||
00191      glob(journal_pattern, globflags | GLOB_APPEND, NULL, pglob) ||
00192      glob(stats_file_pattern, globflags | GLOB_APPEND, NULL, pglob) ||
00193      glob(lock_file_pattern, globflags | GLOB_APPEND, NULL, pglob) ||
00194      glob(jobcount_pattern, globflags | GLOB_APPEND, NULL, pglob) ||
00195      glob(sensor_usock_pattern, globflags | GLOB_APPEND, NULL, pglob))
00196   {
00197     globfree(pglob);
00198     free(pglob);
00199     return 0;
00200   }
00201 
00202   for (i=0; i<pglob->gl_pathc; i++) {
00203     int pid_to_check, rv;
00204     struct stat stbuf;
00205 
00206     /* since GLOB_NOCHECK is specified, must skip patterns in gl_pathv[] */
00207     if(stat(pglob->gl_pathv[i], &stbuf) != 0)
00208       continue;
00209 
00210     if((pid_to_check = gs_parse_pid_from_tempname(pglob->gl_pathv[i])) < 0)
00211       continue;
00212 
00213     rv = kill(pid_to_check, 0);
00214 
00215     /* if kill returns 0 or -1 w/EPERM, then there is probably a process with
00216      * the given PID running now.  in that case we don't want to try to
00217      * delete this file since it may be in use.
00218      */
00219     if((rv == 0) || ((rv < 0) && (errno == EPERM)))
00220       continue;
00221 
00222     LOGPRINTF("Removing old file '%s'\n", pglob->gl_pathv[i]);
00223     unlink(pglob->gl_pathv[i]);
00224   }
00225 
00226   globfree(pglob);
00227   free(pglob);
00228 
00229   return 0;
00230 }
00231 
00244 int
00245 gs_parse_pid_from_requestid(char *reqid)
00246 {
00247   char *fcopy, *pidstr, *eptr;
00248   int pid;
00249 
00250   fcopy = strdup(reqid);
00251   if(!fcopy) return -1;
00252 
00253   pidstr = strchr(fcopy, '_');
00254   pidstr++;
00255   if(!pidstr) {
00256     free(fcopy);
00257     return -1;
00258   }
00259 
00260   pidstr = strchr(pidstr, '_');
00261   pidstr++;
00262   if(!pidstr) {
00263     free(fcopy);
00264     return -1;
00265   }
00266 
00267   eptr = strchr(pidstr, '_');
00268   if(!eptr) {
00269     free(fcopy);
00270     return -1;
00271   }
00272   *eptr = 0;
00273 
00274   pid = atoi(pidstr);
00275 
00276   free(fcopy);
00277 
00278   return pid;
00279 }
00280 
00291 int
00292 gs_parse_pid_from_tempname(char *tempname)
00293 {
00294   char *fcopy, *pidstr;
00295   int pid;
00296 
00297   fcopy = strdup(tempname);
00298   if(!fcopy) return -1;
00299 
00300   pidstr = strtok(fcopy, ".");
00301   pidstr = strtok(NULL, ".");
00302 
00303   if(!pidstr) {
00304     free(fcopy);
00305     return -1;
00306   }
00307 
00308   pid = atoi(pidstr);
00309 
00310   free(fcopy);
00311 
00312   return pid;
00313 }
00314 
00324 int
00325 gs_get_job_count(char *file, int *jcount)
00326 {
00327   int fd, count, n;
00328 
00329   if((fd = gs_open_locked_file(file, F_RDLCK, O_RDONLY)) < 0)
00330     return -1;
00331 
00332   if((n = read(fd, &count, sizeof(count))) < 0) {
00333     gs_unlock_file(fd);
00334     close(fd);
00335     return -1;
00336   }
00337 
00338   if(n == 0)
00339     count = 0;
00340 
00341   gs_unlock_file(fd);
00342   close(fd);
00343 
00344   *jcount = count;
00345   return 0;
00346 }
00347 
00360 int
00361 gs_modify_job_count(char *file, int op)
00362 {
00363   int fd, count, n;
00364 
00365   if((fd = gs_open_locked_file(file, F_WRLCK, O_RDWR | O_CREAT)) < 0) {
00366     ERRPRINTF("Failed to open/lock file '%s' for job count\n", file);
00367     return -1;
00368   }
00369 
00370   if((n = read(fd, &count, sizeof(count))) < 0) {
00371     ERRPRINTF("read failure getting job count\n");
00372     gs_unlock_file(fd);
00373     close(fd);
00374     return -1;
00375   }
00376 
00377   if(n == 0)
00378     count = 0;
00379 
00380   switch(op) {
00381     case '+':
00382       count++;
00383       break;
00384     case '-':
00385       count--;
00386       break;
00387     case 'i':
00388       count = 0;
00389       break;
00390     default:
00391       ERRPRINTF("Warning: bad op type\n");
00392       count = 0;
00393   }
00394 
00395   if(count < 0)
00396     count = 0;
00397 
00398   lseek(fd, 0, SEEK_SET);
00399   write(fd, &count, sizeof(count));
00400   gs_unlock_file(fd);
00401   close(fd);
00402 
00403   return 0;
00404 }
00405 
00414 int
00415 gs_init_job_count(char *file)
00416 {
00417   struct stat stbuf;
00418 
00419   if(stat(file, &stbuf) == 0)
00420     if(unlink(file) < 0)
00421       return -1;
00422 
00423   return gs_modify_job_count(file, 'i');
00424 }
00425 
00434 int
00435 gs_increment_job_count(char *file)
00436 {
00437   return gs_modify_job_count(file, '+');
00438 }
00439 
00448 int
00449 gs_decrement_job_count(char *file)
00450 {
00451   return gs_modify_job_count(file, '-');
00452 }
00453 
00473 int
00474 gs_notify_agent_problem_complete(char *agenthost, in_port_t agentport,
00475   char *problem_name, char *server_cid, char *user_name,
00476   char *host_name, char *client_cid, char *request_id, int agent_taskid,
00477   double run_time)
00478 {
00479   int sock_agent, tag;
00480   pid_t newpid;
00481   char *msg;
00482 
00483   newpid = fork();
00484 
00485   if(newpid < 0) {
00486     ERRPRINTF("fork() failed.\n");
00487     return -1;
00488   }
00489 
00490   if(newpid == 0) {
00491     if(gs_encode_problem_complete_notification(&msg, server_cid, request_id,
00492          agent_taskid, run_time) < 0)
00493     {
00494       ERRPRINTF("Unable to encode message.\n");
00495       _exit(-1);
00496     }
00497 
00498     sock_agent = gs_connect_direct(agenthost, agentport);
00499 
00500     if(sock_agent < 0) {
00501       ERRPRINTF("Unable to connect to agent.\n");
00502       _exit(-1);
00503     }
00504 
00505     if((gs_send_tag(sock_agent, GS_PROT_NOTIFY_COMPLETE) < 0) ||
00506        (gs_send_string(sock_agent, VERSION) < 0)) {
00507       ERRPRINTF("failed to send tag\n");
00508       gs_close_socket(sock_agent);
00509       _exit(-1);
00510     }
00511 
00512     if(gs_recv_tag(sock_agent, &tag) < 0) {
00513       ERRPRINTF("Error communicating with agent.\n");
00514       gs_close_socket(sock_agent);
00515       _exit(-1);
00516     }
00517 
00518     if(tag != GS_PROT_OK) {
00519       if(tag == GS_PROT_VERSION_MISMATCH)
00520         ERRPRINTF("Error: Agent is an incompatible version\n");
00521       else
00522         ERRPRINTF("Error: Agent refused with code %d\n", tag);
00523       gs_close_socket(sock_agent);
00524       _exit(-1);
00525     }
00526 
00527     if(gs_send_string(sock_agent, msg) < 0) {
00528       ERRPRINTF("Error communicating with agent.\n");
00529       gs_close_socket(sock_agent);
00530       _exit(-1);
00531     }
00532 
00533     gs_close_socket(sock_agent);
00534     _exit(0);
00535   }
00536 
00537   return 0;
00538 }
00539 
00559 int
00560 gs_notify_agent_problem_solve(char *agenthost, in_port_t agentport,
00561   char *problem_name, double est_time, char *server_cid, char *user_name,
00562   char *host_name, char *client_cid, char *request_id, int agent_taskid,
00563   double agent_est_time)
00564 {
00565   int sock_agent, tag;
00566   pid_t newpid;
00567   char *msg;
00568 
00569   newpid = fork();
00570 
00571   if(newpid < 0) {
00572     ERRPRINTF("fork() failed.\n");
00573     return -1;
00574   }
00575 
00576   if(newpid == 0) {
00577     if(gs_encode_problem_solve_notification(&msg, problem_name, est_time,
00578          server_cid, user_name, host_name, client_cid, request_id,
00579          agent_taskid, agent_est_time) < 0)
00580     {
00581       ERRPRINTF("Unable to encode message.\n");
00582       _exit(-1);
00583     }
00584 
00585     sock_agent = gs_connect_direct(agenthost, agentport);
00586 
00587     if(sock_agent < 0) {
00588       ERRPRINTF("Unable to connect to agent.\n");
00589       _exit(-1);
00590     }
00591 
00592     if((gs_send_tag(sock_agent, GS_PROT_NOTIFY_SUBMIT) < 0) ||
00593        (gs_send_string(sock_agent, VERSION) < 0)) {
00594       ERRPRINTF("failed to send tag\n");
00595       gs_close_socket(sock_agent);
00596       _exit(-1);
00597     }
00598 
00599     if(gs_recv_tag(sock_agent, &tag) < 0) {
00600       ERRPRINTF("Error communicating with agent.\n");
00601       gs_close_socket(sock_agent);
00602       _exit(-1);
00603     }
00604 
00605     if(tag != GS_PROT_OK) {
00606       if(tag == GS_PROT_VERSION_MISMATCH)
00607         ERRPRINTF("Error: Agent is an incompatible version\n");
00608       else
00609         ERRPRINTF("Error: Agent refused with code %d\n", tag);
00610       gs_close_socket(sock_agent);
00611       _exit(-1);
00612     }
00613 
00614     if(gs_send_string(sock_agent, msg) < 0) {
00615       ERRPRINTF("Error communicating with agent.\n");
00616       gs_close_socket(sock_agent);
00617       _exit(-1);
00618     }
00619 
00620     gs_close_socket(sock_agent);
00621     _exit(0);
00622   }
00623 
00624   return 0;
00625 }
00626 
00640 int
00641 gs_create_info_dir(char componentid[CID_LEN], ipaddr_t ipaddress, in_port_t port, char **dirname)
00642 {
00643   char server_dottedIP[20], cid_string[2*CID_LEN+1], *si_name, *cid_file,
00644     userid[256], *username;
00645   PROXY_COMPONENTADDR myaddr;
00646   struct stat stbuf;
00647   FILE *f;
00648   uid_t uid;
00649 
00650   *dirname = 0;
00651 
00652   username = gs_get_login_name();
00653   if(!username) {
00654     ERRPRINTF("Warning: couldn't get user id\n");
00655     username = "UnknownUserID";
00656   }
00657 
00658   uid = getuid();  /* getuid() always succeeds according to the man page */
00659 
00660   sprintf(userid, "%s_uid%d", username, (int)uid);
00661 
00662   myaddr = proxy_get_local_addr();
00663   memcpy(componentid, myaddr.ID, CID_LEN);
00664 
00665   si_name = dstring_sprintf("%s", GS_INFODIR_PATH);
00666 
00667   if(stat(si_name, &stbuf) != 0) {
00668 
00669     if(mkdir(si_name, 0700) < 0) {
00670       free(si_name);
00671       return -1;
00672     }
00673   }
00674 
00675   free(si_name);
00676 
00677   proxy_ip_to_str(ipaddress, server_dottedIP);
00678 
00679   si_name = dstring_sprintf("%s/%s%s_%hu_%s", GS_INFODIR_PATH,
00680      GS_INFODIR_PREFIX, server_dottedIP, port, userid);
00681 
00682   if(stat(si_name, &stbuf) != 0) {
00683     if(mkdir(si_name, 0700) < 0) {
00684       free(si_name);
00685       return -1;
00686     }
00687   }
00688 
00689   cid_file = dstring_sprintf("%s/%s%s_%hu_%s/cid", GS_INFODIR_PATH,
00690      GS_INFODIR_PREFIX, server_dottedIP, port, userid);
00691 
00692   if(stat(cid_file, &stbuf) != 0) {
00693     if((f = fopen(cid_file, "w")) == NULL) {
00694       free(si_name);
00695       free(cid_file);
00696       return -1;
00697     }
00698 
00699     proxy_cid_to_str(cid_string, myaddr.ID);
00700 
00701     fprintf(f, "%s\n", cid_string);
00702     fclose(f);
00703   }
00704   else {
00705     if((f = fopen(cid_file, "r")) == NULL) {
00706       free(si_name);
00707       free(cid_file);
00708       return -1;
00709     }
00710 
00711     free(cid_file);
00712 
00713     if(fgets(cid_string, 2*CID_LEN+1, f) == NULL) {
00714       fclose(f);
00715       free(si_name);
00716       return -1;
00717     }
00718 
00719     cid_string[2*CID_LEN] = 0;
00720 
00721     fclose(f);
00722 
00723     proxy_set_cid_from_str(cid_string);
00724     myaddr = proxy_get_local_addr();
00725   }
00726 
00727   memcpy(componentid, myaddr.ID, CID_LEN);
00728 
00729   *dirname = si_name;
00730 
00731   return 0;
00732 }
00733 
00734 
00735 
00736 #endif /* WIN32 */