Defines | Functions

workload_report.c File Reference

#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <netdb.h>
#include <stdio.h>
#include <unistd.h>
#include <glob.h>
#include <dirent.h>
#include <errno.h>
#include <utime.h>
#include "server.h"
#include "utility.h"
#include "comm_basics.h"
#include "comm_data.h"
#include "comm_encode.h"
#include "general.h"
#include "gs_pm_model.h"
Include dependency graph for workload_report.c:

Go to the source code of this file.

Defines

#define GSREQUEST_ALL_GLOB_PATTERN   "gsrequest_%s_[0-9]*_*"

Functions

static int gs_send_workload_report (int sock, gs_workload_packet *wp)
 Send workload report.
static int gs_remove_output_data (char *request_id)
 Remove some files that may be left behind.
static int gs_remove_dead_blocking_dirs (gs_server_t *)
static int gs_find_and_remove_retrieved_results (gs_server_t *serv, char *sfile)
 Called periodically to cleanup files.
static int gs_prepare_coefficient_updates (gs_server_t *, char **)
void gs_workload_report (void **args)
 Thread to keep server fresh with agent.
int gs_remove_results_if_too_old (gs_server_t *serv, char *reqdir)
static int gs_count_dir_entries (char *name)

Detailed Description

This file contains code to periodically report the workload to the agent.

Definition in file workload_report.c.


Define Documentation

#define GSREQUEST_ALL_GLOB_PATTERN   "gsrequest_%s_[0-9]*_*"

Definition at line 35 of file workload_report.c.


Function Documentation

static int gs_count_dir_entries ( char *  name  )  [static]

Counts the number of entries in the specified directory, including "." and "..".

Parameters:
name - the name of the directory to examine
Returns:
the number of files in the directory. on error 0 is returned.

Definition at line 397 of file workload_report.c.

{
  DIR *dirp;
  struct dirent *dp;
  int count;

  count = 0;

  if((dirp = opendir(name)) == NULL)
    return 0;

  while((dp = readdir(dirp)) != NULL)
      count++;

  closedir(dirp);

  return count;
}

Here is the caller graph for this function:

static int gs_find_and_remove_retrieved_results ( gs_server_t *  serv,
char *  sfile 
)

Called periodically to cleanup files.

This function will cleanup some files that may be left behind after results have been retrieved or cancelled.

Parameters:
serv -- server structure for this server
sfile -- name of the status file to look for ("cancelled", "retrieved", etc)
Returns:
0 on success, -1 on failure.

Definition at line 493 of file workload_report.c.

{
  char *globpattern, temp_cid[CID_LEN*2+1];
  int globflags = GLOB_NOSORT;
  glob_t *pglob;
  int i, rv;

  proxy_cid_to_str(temp_cid, serv->componentid);

  globpattern = dstring_sprintf(GSREQUEST_ALL_GLOB_PATTERN, temp_cid);

  if(!globpattern) {
    ERRPRINTF("Error creating glob pattern\n");
    return -1;
  }

  pglob = (glob_t*)malloc(sizeof(glob_t));

  if(!pglob || !sfile) {
    free(globpattern);
    return -1;
  }

  if(glob(globpattern, globflags, NULL, pglob) != 0) {
    free(globpattern);
    globfree(pglob);
    free(pglob);
    return 0;
  }

  for (i=0; i<pglob->gl_pathc; i++) {
    struct stat stbuf;
    char *rfile;

    rfile = malloc(strlen(pglob->gl_pathv[i]) + strlen(sfile) + 2);
    if(!rfile) {
      ERRPRINTF("Couldn't malloc space for filename\n");
      free(globpattern);
      globfree(pglob);
      free(pglob);
      return -1;
    }

    sprintf(rfile, "%s/%s", pglob->gl_pathv[i], sfile);

    rv = stat(rfile, &stbuf);

    free(rfile);

    if(rv == 0) {
      if(gs_remove_output_data(pglob->gl_pathv[i]) < 0) {
        ERRPRINTF("Unable to remove output for '%s'\n", pglob->gl_pathv[i]);
        continue;
      }
    }
    else if(serv->output_ttl > 0)
      gs_remove_results_if_too_old(serv, pglob->gl_pathv[i]);
  }

  free(globpattern);
  globfree(pglob);
  free(pglob);

  return 0;
}

Here is the call graph for this function:

Here is the caller graph for this function:

static int gs_prepare_coefficient_updates ( gs_server_t *  gs_server,
char **  msg 
)

prepares the coefficient updates.

Parameters:
gs_server -- server struct
msg -- upon return, this will contain the update string
Returns:
0 on success, -1 on failure.

Definition at line 161 of file workload_report.c.

{
  icl_list_t *coef_files, *l;
  char *cinfo, *update;
  int num_updates;

  num_updates = 0;

  update = dstring_sprintf("");
  if(!update) return -1;

  coef_files = icl_list_new();

  if(coef_files) {
    if(gs_find_coefficient_files(gs_server, coef_files) < 0)
      ERRPRINTF("Error finding coefficient files.\n");

    for(l=icl_list_first(coef_files); l!=NULL; l=icl_list_next(coef_files, l)) {
      char *filename = (char *)l->data;
      long age;

      age = gs_seconds_since_modified(filename);

      if(age < GS_UPDATE_FREQUENCY) {
        char *model_filename, *probname, *expr;
        int i, mfd;

        model_filename = strdup(filename);

        if(!model_filename) {
          ERRPRINTF("strdup failed\n");
          return -1;
        }

        strncpy(model_filename + (strlen(model_filename) - 3), 
           "mdl", 3);

        mfd = open(model_filename, O_RDONLY, 0600);

        if(mfd < 0) {
          ERRPRINTF("Warning: failed to open model file '%s'\n", 
             model_filename);
          continue;
        }

        /* note we're obtaining a lock on the model file, not the
         * coefficient file.  the model file is the one that will
         * be locked by the service processes when both files are
         * updated.
         */

        if(gs_lock_fd_nowait(mfd, F_RDLCK) < 0) {
          struct timeval time_now[2];

          ERRPRINTF("Skipping update of '%s': don't want to wait for lock",
            filename);

          close(mfd);

          /* update timestamp of file so that it will be updated next time */
          gettimeofday(&time_now[0], NULL);
          time_now[0].tv_sec += 10;
          time_now[1] = time_now[0];

#ifdef __INTERIX
      /* Replacing utimes with utime */
      {
        struct utimbuf time_now_utimbuf;
        time_now_utimbuf.actime = time_now[0].tv_sec;
        time_now_utimbuf.modtime = time_now[1].tv_sec;
        utime(filename, &time_now_utimbuf);
      }
#else
      utimes(filename, time_now);
#endif
      
          continue;
        }

        if(gs_get_contents_of_file(filename, &expr) < 0) {
          ERRPRINTF("Warning: failed to read coefficient file '%s'\n", filename);
          gs_unlock_fd(mfd);
          close(mfd);
          continue;
        }

        if(expr[strlen(expr)-1] == '\n')
          expr[strlen(expr)-1] = 0;

        gs_unlock_fd(mfd);
        close(mfd);

        for(i = strlen(filename) - 1; (i >= 0) && (filename[i] != '/'); i--)
          /* spin */ ;

        probname = filename + i + 1;

        for(i=0;i<strlen(probname);i++)
          if(probname[i] == '.') {
            probname[i] = 0;
            break;
          }
          
        if(gs_encode_model_update(&cinfo, probname, expr) == 0) {
          /* don't use append_free */
          update = dstring_append(update, cinfo);
          num_updates++;
        }

        free(expr);
      }
    }

    icl_list_destroy(coef_files, free);
  }

  *msg = dstring_sprintf("%d\n%s", num_updates, update);
  free(update);

  return 0;
}

Here is the call graph for this function:

Here is the caller graph for this function:

static int gs_remove_dead_blocking_dirs ( gs_server_t *  serv  ) 

Removes request subdirectories for blocking jobs that died. Look for all empty request subdirectories and check if the specified pid is still running. if not, delete the subdirectory and decrement the job count.

Returns:
- 0 on success, -1 on failure.

Definition at line 426 of file workload_report.c.

{
  char *globpattern, temp_cid[CID_LEN*2+1];
  int globflags = GLOB_NOSORT;
  glob_t pglob;
  int i;

  proxy_cid_to_str(temp_cid, serv->componentid);

  globpattern = dstring_sprintf(GSREQUEST_ALL_GLOB_PATTERN, temp_cid);

  if(!globpattern) {
    ERRPRINTF("Error creating glob pattern\n");
    return -1;
  }

  if(glob(globpattern, globflags, NULL, &pglob) != 0) {
    free(globpattern);
    globfree(&pglob);
    return 0;
  }

  for (i=0; i<pglob.gl_pathc; i++) {
    /* less than 3 to account for "." and ".." */

    if(gs_count_dir_entries(pglob.gl_pathv[i]) < 3) {
      int pid_to_check, rv;

      if((pid_to_check = gs_parse_pid_from_requestid(pglob.gl_pathv[i])) < 0)
        continue;

      rv = kill(pid_to_check, 0);

      /* if kill returns 0 or -1 w/EPERM, then there is probably a process with
       * the given PID running now.  in that case we don't want to try to
       * delete this file since it may be in use.
       */
      if((rv == 0) || ((rv < 0) && (errno == EPERM)))
        continue;
  
      LOGPRINTF("Removing failed blocking request dir '%s'\n", 
        pglob.gl_pathv[i]);
      rmdir(pglob.gl_pathv[i]);

      if(gs_decrement_job_count(GS_SERVER_JOB_COUNT_FILE) < 0)
        ERRPRINTF("Warning: failed to decrement job count.\n");
    }
  }

  free(globpattern);
  globfree(&pglob);
  return 0;
}

Here is the call graph for this function:

Here is the caller graph for this function:

static int gs_remove_output_data ( char *  request_id  ) 

Remove some files that may be left behind.

Periodically remove some files that may be left behind when after results have been picked up or the call has been cancelled.

Parameters:
request_id -- the identifier of the request data to be removed
Returns:
0 on success, -1 on failure.

Definition at line 570 of file workload_report.c.

{
  struct dirent *dp;
  DIR *dirp;
  int rv;

  /* Uncomment for debugging purposes */
  /* The following line will prevent the output directory from being removed */
  /* return -1; */

  rv = 0;

  if(chdir(request_id) < 0)
    return -1;

  dirp = opendir(".");

  while(dirp) {
    if((dp = readdir(dirp)) != NULL) {
      struct stat buf;

      if(!strcmp(dp->d_name, ".") || !strcmp(dp->d_name, ".."))
        continue;

      if(stat(dp->d_name, &buf) != 0)
        continue;

      if(buf.st_mode & S_IFDIR)
        rv |= gs_remove_output_data(dp->d_name);
      else
        rv |= unlink(dp->d_name);
    }
    else
      break;
  }

  chdir("..");
  closedir(dirp);
  if(rmdir(request_id) < 0)
    return -1;

  return rv;
}

Here is the call graph for this function:

Here is the caller graph for this function:

int gs_remove_results_if_too_old ( gs_server_t *  serv,
char *  reqdir 
)

Removes gsrequest subdirectories that are older than the amount specified in the server_config file.

Parameters:
serv -- server structure for this server
reqdir -- the request subdirectory to be checked
Returns:
0 on success, -1 on failure.

Definition at line 360 of file workload_report.c.

{
  struct stat stbuf;
  struct timeval tv;
  long age;

  if(!reqdir) return -1;

  if(stat(reqdir, &stbuf) < 0)
    return -1;

  gettimeofday(&tv, NULL);

  age = tv.tv_sec - stbuf.st_atime;

  if((serv->output_ttl > 0) && (age > serv->output_ttl)) {
    LOGPRINTF("Removing expired output directory '%s'\n", reqdir);
    if(gs_remove_output_data(reqdir) < 0) {
      ERRPRINTF("Unable to remove output for '%s'\n", reqdir);
      return -1;
    }
  }

  return 0;
}

Here is the call graph for this function:

Here is the caller graph for this function:

static int gs_send_workload_report ( int  sock,
gs_workload_packet *  wp 
) [static]

Send workload report.

Send a workload report to the socket.

Parameters:
sock -- socket to use to communicate with the agent
wp -- pointer to structure containing the workload info to be sent
Returns:
0 on success, -1 on error, -2 when re-registration is required.

Definition at line 294 of file workload_report.c.

{
  char *msg, *cu_msg, temp_cid[CID_LEN*2+1];
  int tag;

  proxy_cid_to_str(temp_cid, wp->server_cid);

  if(gs_encode_workload_report(&msg, wp->server_workload, wp->nproblems,
      temp_cid) < 0) 
    return -1;

  if((gs_send_tag(sock, wp->msgtype) < 0) ||
     (gs_send_string(sock, VERSION) < 0)) {
    free(msg);
    return -1;
  }

  if(gs_recv_tag(sock,&tag) < 0) 
    return -1;

  if(tag != GS_PROT_OK) {
    if(tag == GS_PROT_VERSION_MISMATCH)
      ERRPRINTF("Warning: agent is an incompatible version\n");
    return -1;
  }

  if(gs_send_string(sock, msg) < 0) {
    free(msg);
    return -1;
  }

  cu_msg = wp->coeff_update ? wp->coeff_update : "0";

  if(gs_send_string(sock, cu_msg) < 0) {
    free(msg);
    return -1;
  }

  free(msg);

  /* re-register with agent */
  if(gs_recv_tag(sock, &tag) < 0) 
    return -1;

  if(tag != GS_PROT_OK) {
    if(tag == GS_PROT_UNKNOWN_SERVER)
      return -2;

    ERRPRINTF("Agent rejected workload report for some unspecified reason\n");
    return -1;
  }

  return 0;
}

Here is the call graph for this function:

Here is the caller graph for this function:

void gs_workload_report ( void **  args  ) 

Thread to keep server fresh with agent.

This function should be spawned as a thread at server start-up. It periodically updates the workload with the agent and prevents the server from being expired from the agent's list.

Parameters:
args -- the arguments specified at the time this was m-forked Currently only args[0] is expected to be set. It should be a pointer to the server struct.

Definition at line 58 of file workload_report.c.

{
  gs_server_t *gs_server;
  PROXY_COMPONENTADDR myaddr;
  int sock;
  gs_workload_packet wp;
  int retVal;
  char *portstr;

  if(!args || !args[0])
    return;

  gs_server = args[0];
 
  wp.coeff_update = NULL;

  gs_prepare_coefficient_updates(gs_server, &wp.coeff_update);

  if(!wp.coeff_update)
    wp.coeff_update = strdup("0");

  wp.agent_host = getenv("GRIDSOLVE_AGENT");

  if(!wp.agent_host)
    wp.agent_host = gs_server->agenthost;

  if(!wp.agent_host) {
    ERRPRINTF("agent host not in server_config or GRIDSOLVE_AGENT env var\n");
    return;
  }

  portstr = getenv("GRIDSOLVE_AGENT_PORT");

  if(portstr)
    wp.agent_port = atoi(portstr);
  else
    wp.agent_port = GRIDSOLVE_AGENT_PORT_DEFAULT;

  /*
   * Fill in static information
   */

  wp.msgtype = GS_PROT_WORKLOAD_REPORT;

  myaddr = proxy_get_local_addr();
  memcpy(wp.server_cid, myaddr.ID, CID_LEN);

  wp.server_workload = gs_get_workload();
  /* This scaling is used if the GridSolve monitor is having problems seeing the workload */
  /* wp.server_workload = gs_get_workload() * getenv_int("GRIDSOLVE_WORKLOAD_SCALING", 1); */

  if(wp.server_workload < 0) {
    DBGPRINTF("Unable to get workload.  Using default of 50\n");
    wp.server_workload = 50;
  }

  wp.nproblems = gs_server->nproblems;

  sock = gs_connect_direct(wp.agent_host, wp.agent_port);

  if(sock < 0)  {
    DBGPRINTF("Could not connect to agent.\n");
    return;
  }

  if( (retVal = gs_send_workload_report(sock, &wp)) < 0){
    DBGPRINTF("Error sending workload report.\n");
    if(retVal == -2) {
      /* Since we need to re-register with the server,
       * clear the previous problem list so that all
       * problems are sent.
       */

      if(gs_server->problemlist)
        gs_free_problem(gs_server->problemlist);
      gs_server->problemlist = NULL;

      close(sock);
      gs_server_register(gs_server);
    }
  } 

  close(sock);

  gs_find_and_remove_retrieved_results(gs_server, "retrieved");
  gs_find_and_remove_retrieved_results(gs_server, "cancelled");
  gs_remove_dead_blocking_dirs(gs_server);

  gs_server_register_problems(gs_server);

  return;
}

Here is the call graph for this function:

Here is the caller graph for this function: