workload_report.c

Go to the documentation of this file.
00001 
00008 /* $Id: workload_report.c,v 1.58 2007/05/11 14:48:38 yarkhan Exp $ */
00009 /* $UTK_Copyright: $ */
00010 
00011 #include <string.h>
00012 #include <sys/types.h>
00013 #include <sys/socket.h>
00014 #include <sys/stat.h>
00015 #include <sys/time.h>
00016 #include <netinet/in.h>
00017 #include <netdb.h>
00018 #include <stdio.h>
00019 #include <unistd.h>
00020 #include <glob.h>
00021 #include <dirent.h>
00022 #include <errno.h>
00023 #include <utime.h>
00024 
00025 #include "server.h"
00026 #include "utility.h"
00027 #include "comm_basics.h"
00028 #include "comm_data.h"
00029 #include "comm_encode.h"
00030 #include "general.h"
00031 #include "gs_pm_model.h"
00032 
00033 /* glob pattern for matching all gsrequest subdirectories */
00034 
00035 #define GSREQUEST_ALL_GLOB_PATTERN "gsrequest_%s_[0-9]*_*"
00036 
00037 /* Local function prototypes */
00038 
00039 static int 
00040   gs_send_workload_report(int, gs_workload_packet *),
00041   gs_remove_output_data(char *),
00042   gs_remove_dead_blocking_dirs(gs_server_t *),
00043   gs_find_and_remove_retrieved_results(gs_server_t *, char *),
00044   gs_prepare_coefficient_updates(gs_server_t *, char **);
00045 
00057 void 
00058 gs_workload_report(void **args)
00059 {
00060   gs_server_t *gs_server;
00061   PROXY_COMPONENTADDR myaddr;
00062   int sock;
00063   gs_workload_packet wp;
00064   int retVal;
00065   char *portstr;
00066 
00067   if(!args || !args[0])
00068     return;
00069 
00070   gs_server = args[0];
00071  
00072   wp.coeff_update = NULL;
00073 
00074   gs_prepare_coefficient_updates(gs_server, &wp.coeff_update);
00075 
00076   if(!wp.coeff_update)
00077     wp.coeff_update = strdup("0");
00078 
00079   wp.agent_host = getenv("GRIDSOLVE_AGENT");
00080 
00081   if(!wp.agent_host)
00082     wp.agent_host = gs_server->agenthost;
00083 
00084   if(!wp.agent_host) {
00085     ERRPRINTF("agent host not in server_config or GRIDSOLVE_AGENT env var\n");
00086     return;
00087   }
00088 
00089   portstr = getenv("GRIDSOLVE_AGENT_PORT");
00090 
00091   if(portstr)
00092     wp.agent_port = atoi(portstr);
00093   else
00094     wp.agent_port = GRIDSOLVE_AGENT_PORT_DEFAULT;
00095 
00096   /*
00097    * Fill in static information
00098    */
00099 
00100   wp.msgtype = GS_PROT_WORKLOAD_REPORT;
00101 
00102   myaddr = proxy_get_local_addr();
00103   memcpy(wp.server_cid, myaddr.ID, CID_LEN);
00104 
00105   wp.server_workload = gs_get_workload();
00106   /* This scaling is used if the GridSolve monitor is having problems seeing the workload */
00107   /* wp.server_workload = gs_get_workload() * getenv_int("GRIDSOLVE_WORKLOAD_SCALING", 1); */
00108 
00109   if(wp.server_workload < 0) {
00110     DBGPRINTF("Unable to get workload.  Using default of 50\n");
00111     wp.server_workload = 50;
00112   }
00113 
00114   wp.nproblems = gs_server->nproblems;
00115 
00116   sock = gs_connect_direct(wp.agent_host, wp.agent_port);
00117 
00118   if(sock < 0)  {
00119     DBGPRINTF("Could not connect to agent.\n");
00120     return;
00121   }
00122 
00123   if( (retVal = gs_send_workload_report(sock, &wp)) < 0){
00124     DBGPRINTF("Error sending workload report.\n");
00125     if(retVal == -2) {
00126       /* Since we need to re-register with the server,
00127        * clear the previous problem list so that all
00128        * problems are sent.
00129        */
00130 
00131       if(gs_server->problemlist)
00132         gs_free_problem(gs_server->problemlist);
00133       gs_server->problemlist = NULL;
00134 
00135       close(sock);
00136       gs_server_register(gs_server);
00137     }
00138   } 
00139 
00140   close(sock);
00141 
00142   gs_find_and_remove_retrieved_results(gs_server, "retrieved");
00143   gs_find_and_remove_retrieved_results(gs_server, "cancelled");
00144   gs_remove_dead_blocking_dirs(gs_server);
00145 
00146   gs_server_register_problems(gs_server);
00147 
00148   return;
00149 }
00150 
00160 static int
00161 gs_prepare_coefficient_updates(gs_server_t *gs_server, char **msg)
00162 {
00163   icl_list_t *coef_files, *l;
00164   char *cinfo, *update;
00165   int num_updates;
00166 
00167   num_updates = 0;
00168 
00169   update = dstring_sprintf("");
00170   if(!update) return -1;
00171 
00172   coef_files = icl_list_new();
00173 
00174   if(coef_files) {
00175     if(gs_find_coefficient_files(gs_server, coef_files) < 0)
00176       ERRPRINTF("Error finding coefficient files.\n");
00177 
00178     for(l=icl_list_first(coef_files); l!=NULL; l=icl_list_next(coef_files, l)) {
00179       char *filename = (char *)l->data;
00180       long age;
00181 
00182       age = gs_seconds_since_modified(filename);
00183 
00184       if(age < GS_UPDATE_FREQUENCY) {
00185         char *model_filename, *probname, *expr;
00186         int i, mfd;
00187 
00188         model_filename = strdup(filename);
00189 
00190         if(!model_filename) {
00191           ERRPRINTF("strdup failed\n");
00192           return -1;
00193         }
00194 
00195         strncpy(model_filename + (strlen(model_filename) - 3), 
00196            "mdl", 3);
00197 
00198         mfd = open(model_filename, O_RDONLY, 0600);
00199 
00200         if(mfd < 0) {
00201           ERRPRINTF("Warning: failed to open model file '%s'\n", 
00202              model_filename);
00203           continue;
00204         }
00205 
00206         /* note we're obtaining a lock on the model file, not the
00207          * coefficient file.  the model file is the one that will
00208          * be locked by the service processes when both files are
00209          * updated.
00210          */
00211 
00212         if(gs_lock_fd_nowait(mfd, F_RDLCK) < 0) {
00213           struct timeval time_now[2];
00214 
00215           ERRPRINTF("Skipping update of '%s': don't want to wait for lock",
00216             filename);
00217 
00218           close(mfd);
00219 
00220           /* update timestamp of file so that it will be updated next time */
00221           gettimeofday(&time_now[0], NULL);
00222           time_now[0].tv_sec += 10;
00223           time_now[1] = time_now[0];
00224 
00225 #ifdef __INTERIX
00226       /* Replacing utimes with utime */
00227       {
00228         struct utimbuf time_now_utimbuf;
00229         time_now_utimbuf.actime = time_now[0].tv_sec;
00230         time_now_utimbuf.modtime = time_now[1].tv_sec;
00231         utime(filename, &time_now_utimbuf);
00232       }
00233 #else
00234       utimes(filename, time_now);
00235 #endif
00236       
00237           continue;
00238         }
00239 
00240         if(gs_get_contents_of_file(filename, &expr) < 0) {
00241           ERRPRINTF("Warning: failed to read coefficient file '%s'\n", filename);
00242           gs_unlock_fd(mfd);
00243           close(mfd);
00244           continue;
00245         }
00246 
00247         if(expr[strlen(expr)-1] == '\n')
00248           expr[strlen(expr)-1] = 0;
00249 
00250         gs_unlock_fd(mfd);
00251         close(mfd);
00252 
00253         for(i = strlen(filename) - 1; (i >= 0) && (filename[i] != '/'); i--)
00254           /* spin */ ;
00255 
00256         probname = filename + i + 1;
00257 
00258         for(i=0;i<strlen(probname);i++)
00259           if(probname[i] == '.') {
00260             probname[i] = 0;
00261             break;
00262           }
00263           
00264         if(gs_encode_model_update(&cinfo, probname, expr) == 0) {
00265           /* don't use append_free */
00266           update = dstring_append(update, cinfo);
00267           num_updates++;
00268         }
00269 
00270         free(expr);
00271       }
00272     }
00273 
00274     icl_list_destroy(coef_files, free);
00275   }
00276 
00277   *msg = dstring_sprintf("%d\n%s", num_updates, update);
00278   free(update);
00279 
00280   return 0;
00281 }
00282 
00293 static int
00294 gs_send_workload_report(int sock, gs_workload_packet *wp)
00295 {
00296   char *msg, *cu_msg, temp_cid[CID_LEN*2+1];
00297   int tag;
00298 
00299   proxy_cid_to_str(temp_cid, wp->server_cid);
00300 
00301   if(gs_encode_workload_report(&msg, wp->server_workload, wp->nproblems,
00302       temp_cid) < 0) 
00303     return -1;
00304 
00305   if((gs_send_tag(sock, wp->msgtype) < 0) ||
00306      (gs_send_string(sock, VERSION) < 0)) {
00307     free(msg);
00308     return -1;
00309   }
00310 
00311   if(gs_recv_tag(sock,&tag) < 0) 
00312     return -1;
00313 
00314   if(tag != GS_PROT_OK) {
00315     if(tag == GS_PROT_VERSION_MISMATCH)
00316       ERRPRINTF("Warning: agent is an incompatible version\n");
00317     return -1;
00318   }
00319 
00320   if(gs_send_string(sock, msg) < 0) {
00321     free(msg);
00322     return -1;
00323   }
00324 
00325   cu_msg = wp->coeff_update ? wp->coeff_update : "0";
00326 
00327   if(gs_send_string(sock, cu_msg) < 0) {
00328     free(msg);
00329     return -1;
00330   }
00331 
00332   free(msg);
00333 
00334   /* re-register with agent */
00335   if(gs_recv_tag(sock, &tag) < 0) 
00336     return -1;
00337 
00338   if(tag != GS_PROT_OK) {
00339     if(tag == GS_PROT_UNKNOWN_SERVER)
00340       return -2;
00341 
00342     ERRPRINTF("Agent rejected workload report for some unspecified reason\n");
00343     return -1;
00344   }
00345 
00346   return 0;
00347 }
00348 
00359 int
00360 gs_remove_results_if_too_old(gs_server_t *serv, char *reqdir)
00361 {
00362   struct stat stbuf;
00363   struct timeval tv;
00364   long age;
00365 
00366   if(!reqdir) return -1;
00367 
00368   if(stat(reqdir, &stbuf) < 0)
00369     return -1;
00370 
00371   gettimeofday(&tv, NULL);
00372 
00373   age = tv.tv_sec - stbuf.st_atime;
00374 
00375   if((serv->output_ttl > 0) && (age > serv->output_ttl)) {
00376     LOGPRINTF("Removing expired output directory '%s'\n", reqdir);
00377     if(gs_remove_output_data(reqdir) < 0) {
00378       ERRPRINTF("Unable to remove output for '%s'\n", reqdir);
00379       return -1;
00380     }
00381   }
00382 
00383   return 0;
00384 }
00385 
00396 static int
00397 gs_count_dir_entries(char *name)
00398 {
00399   DIR *dirp;
00400   struct dirent *dp;
00401   int count;
00402 
00403   count = 0;
00404 
00405   if((dirp = opendir(name)) == NULL)
00406     return 0;
00407 
00408   while((dp = readdir(dirp)) != NULL)
00409       count++;
00410 
00411   closedir(dirp);
00412 
00413   return count;
00414 }
00415 
00425 static int
00426 gs_remove_dead_blocking_dirs(gs_server_t *serv)
00427 {
00428   char *globpattern, temp_cid[CID_LEN*2+1];
00429   int globflags = GLOB_NOSORT;
00430   glob_t pglob;
00431   int i;
00432 
00433   proxy_cid_to_str(temp_cid, serv->componentid);
00434 
00435   globpattern = dstring_sprintf(GSREQUEST_ALL_GLOB_PATTERN, temp_cid);
00436 
00437   if(!globpattern) {
00438     ERRPRINTF("Error creating glob pattern\n");
00439     return -1;
00440   }
00441 
00442   if(glob(globpattern, globflags, NULL, &pglob) != 0) {
00443     free(globpattern);
00444     globfree(&pglob);
00445     return 0;
00446   }
00447 
00448   for (i=0; i<pglob.gl_pathc; i++) {
00449     /* less than 3 to account for "." and ".." */
00450 
00451     if(gs_count_dir_entries(pglob.gl_pathv[i]) < 3) {
00452       int pid_to_check, rv;
00453 
00454       if((pid_to_check = gs_parse_pid_from_requestid(pglob.gl_pathv[i])) < 0)
00455         continue;
00456 
00457       rv = kill(pid_to_check, 0);
00458 
00459       /* if kill returns 0 or -1 w/EPERM, then there is probably a process with
00460        * the given PID running now.  in that case we don't want to try to
00461        * delete this file since it may be in use.
00462        */
00463       if((rv == 0) || ((rv < 0) && (errno == EPERM)))
00464         continue;
00465   
00466       LOGPRINTF("Removing failed blocking request dir '%s'\n", 
00467         pglob.gl_pathv[i]);
00468       rmdir(pglob.gl_pathv[i]);
00469 
00470       if(gs_decrement_job_count(GS_SERVER_JOB_COUNT_FILE) < 0)
00471         ERRPRINTF("Warning: failed to decrement job count.\n");
00472     }
00473   }
00474 
00475   free(globpattern);
00476   globfree(&pglob);
00477   return 0;
00478 }
00479 
00492 static int
00493 gs_find_and_remove_retrieved_results(gs_server_t *serv, char *sfile)
00494 {
00495   char *globpattern, temp_cid[CID_LEN*2+1];
00496   int globflags = GLOB_NOSORT;
00497   glob_t *pglob;
00498   int i, rv;
00499 
00500   proxy_cid_to_str(temp_cid, serv->componentid);
00501 
00502   globpattern = dstring_sprintf(GSREQUEST_ALL_GLOB_PATTERN, temp_cid);
00503 
00504   if(!globpattern) {
00505     ERRPRINTF("Error creating glob pattern\n");
00506     return -1;
00507   }
00508 
00509   pglob = (glob_t*)malloc(sizeof(glob_t));
00510 
00511   if(!pglob || !sfile) {
00512     free(globpattern);
00513     return -1;
00514   }
00515 
00516   if(glob(globpattern, globflags, NULL, pglob) != 0) {
00517     free(globpattern);
00518     globfree(pglob);
00519     free(pglob);
00520     return 0;
00521   }
00522 
00523   for (i=0; i<pglob->gl_pathc; i++) {
00524     struct stat stbuf;
00525     char *rfile;
00526 
00527     rfile = malloc(strlen(pglob->gl_pathv[i]) + strlen(sfile) + 2);
00528     if(!rfile) {
00529       ERRPRINTF("Couldn't malloc space for filename\n");
00530       free(globpattern);
00531       globfree(pglob);
00532       free(pglob);
00533       return -1;
00534     }
00535 
00536     sprintf(rfile, "%s/%s", pglob->gl_pathv[i], sfile);
00537 
00538     rv = stat(rfile, &stbuf);
00539 
00540     free(rfile);
00541 
00542     if(rv == 0) {
00543       if(gs_remove_output_data(pglob->gl_pathv[i]) < 0) {
00544         ERRPRINTF("Unable to remove output for '%s'\n", pglob->gl_pathv[i]);
00545         continue;
00546       }
00547     }
00548     else if(serv->output_ttl > 0)
00549       gs_remove_results_if_too_old(serv, pglob->gl_pathv[i]);
00550   }
00551 
00552   free(globpattern);
00553   globfree(pglob);
00554   free(pglob);
00555 
00556   return 0;
00557 }
00558 
00569 static int
00570 gs_remove_output_data(char *request_id)
00571 {
00572   struct dirent *dp;
00573   DIR *dirp;
00574   int rv;
00575 
00576   /* Uncomment for debugging purposes */
00577   /* The following line will prevent the output directory from being removed */
00578   /* return -1; */
00579 
00580   rv = 0;
00581 
00582   if(chdir(request_id) < 0)
00583     return -1;
00584 
00585   dirp = opendir(".");
00586 
00587   while(dirp) {
00588     if((dp = readdir(dirp)) != NULL) {
00589       struct stat buf;
00590 
00591       if(!strcmp(dp->d_name, ".") || !strcmp(dp->d_name, ".."))
00592         continue;
00593 
00594       if(stat(dp->d_name, &buf) != 0)
00595         continue;
00596 
00597       if(buf.st_mode & S_IFDIR)
00598         rv |= gs_remove_output_data(dp->d_name);
00599       else
00600         rv |= unlink(dp->d_name);
00601     }
00602     else
00603       break;
00604   }
00605 
00606   chdir("..");
00607   closedir(dirp);
00608   if(rmdir(request_id) < 0)
00609     return -1;
00610 
00611   return rv;
00612 }