gs_agent_sensor.c

Go to the documentation of this file.
00001 #include <sys/types.h>
00002 #include <sys/socket.h>
00003 #include <sys/time.h>
00004 #include <sys/un.h>
00005 #include <stdio.h>
00006 #include <string.h>
00007 #include <stdlib.h>
00008 #include <errno.h>
00009 #include <unistd.h>
00010 
00011 #include "comm_encode.h"
00012 #include "comm_basics.h"
00013 #include "comm_encode.h"
00014 #include "utility.h"
00015 #include "mfork.h"
00016 #include "icl_list.h"
00017 #include "agent.h"
00018 #include "gs_storage.h"
00019 
00020 int
00021   gs_sensor_handle_serverlist(int);
00022 
00029 static void
00030 gs_sensor_generic_signal_handler(int sig)
00031 {
00032   /* pass along SIGHUP to parent */
00033   if(sig == SIGHUP) {
00034     kill(getppid(), SIGHUP);
00035     return;
00036   }
00037 
00038   ERRPRINTF("Sensor terminating on signal %d.\n", sig);
00039   unlink(GRIDSOLVE_SENSOR_USOCK);
00040   _exit(0);
00041 }
00042 
00052 int 
00053 gs_sensor_update_all(gs_agent_conn_t *clients, char *msg)
00054 {
00055   int i;
00056 
00057   if(!clients || !msg) {
00058     ERRPRINTF("Invalid args\n");
00059     return -1;
00060   }
00061 
00062   for(i=0;i<=clients->maxfd;i++)
00063     if(clients->fd[i])
00064       gs_writen(i, msg, strlen(msg));
00065 
00066   return 0;
00067 }
00068 
00079 int
00080 gs_write_int_nl(int sock, int num)
00081 {
00082   char s[256];
00083   sprintf(s, "%d\n", num);
00084   return gs_writen(sock, s, strlen(s)) < 0 ? -1 : 0;
00085 }
00086 
00097 int
00098 gs_write_nl(int sock, char *msg)
00099 {
00100   if(!msg) {
00101     ERRPRINTF("Invalid arg: null msg\n");
00102     return -1;
00103   }
00104 
00105   if(gs_writen(sock, msg, strlen(msg)) < 0 ||
00106      gs_writen(sock, "\n", 1) < 0)
00107     return -1;
00108 
00109   return 0;
00110 }
00111 
00124 int
00125 gs_sensor_get_line(int myfd, char *s, int max)
00126 {
00127   int len;
00128 
00129   if(!s) {
00130     ERRPRINTF("Invalid args: null buffer\n");
00131     return -1;
00132   }
00133 
00134   len = proxy_read_timeout(myfd, s, max-1, PROXY_TIMEOUT_DEFAULT);
00135 
00136   if(len <= 0)
00137     return -1;
00138   else if (len <= max)
00139     s[len] = '\0';
00140 
00141   return 0;
00142 }
00143 
00150 void 
00151 gs_sensor_send_list(int socket)
00152 {
00153   int cnt, i;
00154   gs_server_t **servers;
00155   char temp_cid[CID_LEN * 2 + 1];
00156   
00157   if(gs_get_all_servers(NULL, &servers, &cnt) < 0)
00158   {
00159     ERRPRINTF("Sensor can't get server list.\n");
00160     
00161     /*
00162     ** send empty list to client
00163     */
00164     cnt = -1;
00165   } 
00166     
00167   DBGPRINTF("CNT: %d. SOCKET: %d\n", cnt, socket);
00168 
00169   /*
00170   ** send server list to client
00171   */
00172   i = gs_write_int_nl(socket, cnt);
00173   for(i = 0; i < cnt; i++)
00174   {
00175     struct in_addr tmpaddr;
00176 
00177     gs_write_nl(socket, servers[i]->hostname);
00178     proxy_cid_to_str(temp_cid, servers[i]->componentid);
00179     gs_write_nl(socket, temp_cid);
00180     tmpaddr.s_addr = servers[i]->ipaddress;
00181     gs_write_nl(socket, inet_ntoa(tmpaddr));
00182     gs_write_int_nl(socket, servers[i]->port);
00183     gs_write_int_nl(socket, servers[i]->workload);
00184     gs_write_nl(socket, "standard");
00185     gs_write_nl(socket, "0");
00186     gs_write_nl(socket, "0");
00187   }
00188 
00189   for(i = 0; i < cnt; i++)
00190     gs_server_free(servers[i]);
00191   FREE(servers);
00192 }
00193 
00203 int
00204 gs_sensor_handle_connection(int sensorsock, int *fd)
00205 {
00206   struct sockaddr_in sclient;
00207   int myfd, len;
00208   char *tok;
00209   char s[GS_SENSOR_MAX_LINE];
00210 
00211   if(!fd) {
00212     ERRPRINTF("Invalid arg: null fd\n");
00213     return -1;
00214   }
00215 
00216   DBGPRINTF("Received new connection from Agent.\n");
00217   len = sizeof(struct sockaddr_in);
00218   myfd = accept(sensorsock, (struct sockaddr *) &sclient, 
00219      (socklen_t *)&len);
00220   if(myfd < 0) {
00221     if(errno == EINTR) {
00222       return -1;
00223     }
00224     else {
00225       perror("accept");
00226       return -1;
00227     }
00228   }
00229 
00230   /* send sensor version */
00231   gs_write_nl(myfd, "1.3");
00232 
00233   /*
00234   ** Handle message.
00235   */
00236   if(gs_sensor_get_line(myfd, s, GS_SENSOR_MAX_LINE) < 0) {
00237     close(myfd);
00238     return -1;
00239   }
00240 
00241   DBGPRINTF("READ: %s\n", s);
00242   tok = strtok(s, " \n\r");
00243 
00244   if(!tok) {
00245     DBGPRINTF("Badly formed request.\n");
00246     close(myfd);
00247     return -1;
00248   }
00249 
00250   if(!strcmp(tok, "NS_SERVERLIST"))
00251     gs_sensor_handle_serverlist(myfd);
00252   else {
00253     DBGPRINTF("Unknown request.\n");
00254     close(myfd);
00255     return -1;
00256   }
00257 
00258   *fd = myfd;
00259   return 0;
00260 }
00261 
00271 int
00272 gs_sensor_handle_disconnect(int myfd, gs_agent_conn_t *connections)
00273 {
00274   char s[GS_SENSOR_MAX_LINE];
00275 
00276   if(!connections) {
00277     ERRPRINTF("Invalid arg: null connections ptr\n");
00278     return -1;
00279   }
00280 
00281   DBGPRINTF("DISCONNECT\n");
00282   gs_write_nl(myfd, "DISCONNECTING");
00283 
00284   if(gs_sensor_get_line(myfd, s, GS_SENSOR_MAX_LINE) < 0) {
00285     close(myfd);
00286     return -1;
00287   }
00288 
00289   /* remove connection */
00290   gs_agent_del_conn(connections, myfd);
00291   close(myfd);
00292 
00293   DBGPRINTF("Sensor removed client %d.\n", myfd);
00294 
00295   return 0;
00296 }
00297 
00307 int
00308 gs_sensor_handle_serverlist(int myfd)
00309 {
00310   char s[GS_SENSOR_MAX_LINE];
00311 
00312   DBGPRINTF("NS_SERVERLIST\n");
00313 
00314   /* send md5 hash */
00315   gs_write_nl(myfd, "00000000000000000000000000000000");
00316 
00317   /* get response (ignored) */
00318   if(gs_sensor_get_line(myfd, s, GS_SENSOR_MAX_LINE) < 0) {
00319     close(myfd);
00320     return -1;
00321   }
00322 
00323   gs_write_nl(myfd, "ACCEPT");
00324   gs_write_int_nl(myfd, myfd);
00325 
00326   /* send sensor version */
00327   gs_write_nl(myfd, "-2");
00328   gs_write_nl(myfd, "1.3");
00329   gs_write_nl(myfd, "3.0");
00330 
00331   /* send server list */
00332   gs_sensor_send_list(myfd);
00333 
00334   DBGPRINTF("BACK FROM gs_sensor_send_list.\n");
00335 
00336   return 0;
00337 }
00338 
00348 int
00349 gs_sensor_handle_ping(int myfd)
00350 {
00351   DBGPRINTF("PING\n");
00352 
00353   return gs_write_nl(myfd, "PONG") < 0 ? -1 : 0;
00354 }
00355 
00366 int
00367 gs_sensor_process_agent_message(gs_agent_conn_t *connections, int myfd)
00368 {
00369   char s[GS_SENSOR_MAX_LINE];
00370 
00371   if(!connections) {
00372     ERRPRINTF("Invalid arg: null connections ptr\n");
00373     return -1;
00374   }
00375 
00376   DBGPRINTF("Processing request on fd %d\n", myfd);
00377 
00378   /*
00379    * Handle message.
00380    */
00381   if(gs_sensor_get_line(myfd, s, GS_SENSOR_MAX_LINE) < 0) {
00382     close(myfd);
00383     return -1;
00384   }
00385 
00386   DBGPRINTF("READ: %s\n", s);
00387 
00388   return gs_sensor_update_all(connections, s);
00389 }
00390 
00401 int
00402 gs_sensor_process_request(int myfd, gs_agent_conn_t *connections)
00403 {
00404   /* to keep purify quiet... ugh. */
00405   char tmp_svlist[] = {"NS_SERVERSIST"}, tmp_ping[] = {"PING"}, 
00406      tmp_dis[] = {"DISCONNECT"}, tmp_0900[] = {"0900"};
00407   char s[GS_SENSOR_MAX_LINE], *tok;
00408 
00409   if(!connections) {
00410     ERRPRINTF("Invalid arg: null connections ptr\n");
00411     return -1;
00412   }
00413 
00414   DBGPRINTF("Processing request on fd %d\n", myfd);
00415 
00416   /*
00417    * Handle message.
00418    */
00419   if(gs_sensor_get_line(myfd, s, GS_SENSOR_MAX_LINE) < 0) {
00420     close(myfd);
00421     return -1;
00422   }
00423 
00424   DBGPRINTF("READ: %s\n", s);
00425 
00426   tok = strtok(s, " \n\r");
00427   if(!tok) 
00428     return -1;
00429 
00430   DBGPRINTF("request tok = '%s'\n", tok);
00431 
00432   if(!strcmp(tok, tmp_svlist))
00433     return gs_sensor_handle_serverlist(myfd);
00434   else if(!strcmp(tok, tmp_ping))
00435     return gs_sensor_handle_ping(myfd);
00436   else if(!strcmp(tok, tmp_dis))
00437     return gs_sensor_handle_disconnect(myfd, connections);
00438   else if(!strcmp(tok, tmp_0900))
00439     return 0;
00440   else
00441     ERRPRINTF("Unknown message type, tok = %s\n", tok);
00442 
00443   return 0;
00444 }
00445 
00456 int
00457 gs_agent_sensor_process_messages(int agentsock, int sensorsock)
00458 {
00459   int maxfd, nready, myfd;
00460   fd_set allset, rset;
00461   struct timeval tv;
00462   gs_agent_conn_t conns;
00463 
00464   gs_agent_init_conns(&conns);
00465 
00466   /* 
00467    * Listen for connections and service requests
00468    */
00469   DBGPRINTF("AGENT SENSOR RUNNING. PID: %d\n", (int)getpid());
00470   while(1) {
00471     gs_agent_setup_fd_sets(&conns, sensorsock, agentsock, &allset, &maxfd);
00472     rset = allset;
00473     tv.tv_sec = 1;
00474     tv.tv_usec = 0;
00475     myfd = -1;
00476 
00477     nready = select(maxfd + 1, &rset, NULL, NULL, &tv);
00478 
00479     if((nready < 0) && (errno == EINTR))
00480       continue;
00481 
00482     if(nready < 0) {
00483       ERRPRINTF("select failed.. aborting.\n");
00484       break;
00485     }
00486 
00487     if(nready == 0) {
00488       if(!mfork_check_parent()) {
00489         ERRPRINTF("Parent died, so I am exiting\n");
00490         break;
00491       }
00492       continue;
00493     }
00494     DBGPRINTF("FDs ready: %d.\n", nready);
00495 
00496     /*
00497     ** Handle new connection.
00498     **/
00499     if(FD_ISSET(sensorsock, &rset)) {
00500       if(gs_sensor_handle_connection(sensorsock, &myfd) < 0) {
00501         ERRPRINTF("Error handling connection on sensor socket.\n");
00502         continue;
00503       }
00504 
00505       if(gs_agent_add_conn(&conns, myfd) < 0) {
00506         ERRPRINTF("Could not add connection for fd %d.\n", myfd);
00507         close(myfd);
00508         continue;
00509       }
00510 
00511       DBGPRINTF("Accepted Connection on fd %d.\n", myfd);
00512       continue;
00513     } 
00514     else if(FD_ISSET(agentsock, &rset)) {
00515       gs_sensor_process_agent_message(&conns, agentsock);
00516       continue;
00517     } 
00518     else {
00519       int i;
00520       myfd = -1;
00521 
00522       /*
00523        * Find the FD that's ready
00524        */
00525       for(i=0;i<=conns.maxfd;i++) {
00526         if(conns.fd[i]) {
00527           if(FD_ISSET(i, &rset)) {
00528             myfd = i;
00529             DBGPRINTF("FD %d is ready to be read.\n", myfd);
00530             break;
00531           }
00532         }
00533       }
00534     }
00535 
00536     /*
00537      * Just in case we had an accept error above.
00538      */
00539     if(myfd < 0) {
00540       DBGPRINTF("Accept failed or couldn't find fd: %d.\n", myfd);
00541       continue;
00542     }
00543 
00544     DBGPRINTF("MyFD = %d.\n", myfd);
00545 
00546     if(gs_sensor_process_request(myfd, &conns) < 0) {
00547       gs_agent_del_conn(&conns, myfd);
00548       close(myfd);
00549     }
00550   }
00551 
00552   return 0;
00553 }
00554 
00565 void 
00566 gs_agent_sensor_run(void **args)
00567 {
00568   int *notification_pipe, dump, len, sensorsock, agentsock, tmpsock;
00569   struct sockaddr_un mysaddr;
00570   struct sockaddr_in sclient;
00571   char junk = 'x';
00572   in_port_t port;
00573 
00574   if(!args || !args[0] || !args[1]) {
00575     ERRPRINTF("NULL args.. aborting.\n");
00576     _exit(-1);
00577   }
00578 
00579   DBGPRINTF("In sensor.\n");
00580   notification_pipe = (int *)args[0];
00581 
00582   /* handle all signals except SIGINT.  if the agent is running in
00583    * console mode, we only want the parent to get SIGINT, so that all
00584    * the children will realize the parent is dead via mfork_check_parent()
00585    * and cleanly terminate.  having all processes catch SIGINT causes
00586    * mfork to try to restart them, depending on whether they catch it
00587    * before or after the parent.
00588    */
00589   gs_setup_signal_handlers(gs_sensor_generic_signal_handler);
00590   signal (SIGINT, SIG_IGN);
00591 
00592   /*
00593    * Setup Client Socket 
00594    */
00595   port = getenv_int("GRIDSOLVE_SENSOR_PORT", GRIDSOLVE_SENSOR_PORT_DEFAULT);
00596 
00597   sensorsock = gs_establish_socket(&port, 0);
00598   if(sensorsock < 0) {
00599     ERRPRINTF("Could not open sensor socket.\n");
00600     _exit(-1);
00601   }
00602   gs_listen_on_socket(sensorsock);
00603   DBGPRINTF("Sensor Listening Socket Created on %d.\n", port);
00604 
00605   /*
00606    * Connect to DB manager.
00607    */
00608   if(gs_storage_init(args[1]) < 0) {
00609     ERRPRINTF("Sensor could not connect to DB.\n");
00610     _exit(-1);
00611   }
00612   DBGPRINTF("Sensor Connected to DB Manager.\n");
00613 
00614   /* 
00615    * setup Unix Domain Socket 
00616    */
00617   tmpsock = socket(PF_UNIX, SOCK_STREAM, 0);
00618   if(tmpsock < 0) {
00619     ERRPRINTF("Could not create agent<->sensor socket.\n");
00620     perror("socket");
00621     _exit(-1);
00622   }
00623 
00624   memset(&mysaddr, 0x0, sizeof(struct sockaddr_un));
00625   mysaddr.sun_family = PF_UNIX;
00626   strcpy(mysaddr.sun_path, GRIDSOLVE_SENSOR_USOCK);
00627   unlink(mysaddr.sun_path);
00628 
00629   if(bind(tmpsock, (struct sockaddr *) &mysaddr, 
00630           sizeof(struct sockaddr_un)) < 0 ) {
00631     ERRPRINTF("Could not bind sensor UDS.\n");
00632     perror("bind");
00633     _exit(-1);
00634   }
00635 
00636   if(listen(tmpsock, GRIDSOLVE_AGENT_MAX_CONNECTIONS) < 0) {
00637     ERRPRINTF("Could not listen on sensor UDS.\n");
00638     perror("listen");
00639     _exit(-1);
00640   }
00641 
00642   /* don't check for errors on the write() here because in the 
00643    * event that this is restarted by mfork, the other end of 
00644    * this pipe will be closed already.
00645    */
00646   DBGPRINTF("Writing 1.\n");
00647   dump = write(notification_pipe[1], &junk, 1);
00648   DBGPRINTF("Write returned %d.\n", dump);
00649   close(notification_pipe[1]);
00650   close(notification_pipe[0]);
00651 
00652   /* wait for agent to connect. */
00653   len = sizeof(struct sockaddr_in);
00654   agentsock = accept(tmpsock, (struct sockaddr *) &sclient, 
00655       (socklen_t *)&len);
00656   if(agentsock < 0) {
00657     ERRPRINTF("Could not accept connection from agent to sensor.\n");
00658     perror("accept");
00659     _exit(-1);
00660   }
00661   DBGPRINTF("Agent<->Sensor Socket Created.\n");
00662   close(tmpsock);
00663 
00664   gs_agent_sensor_process_messages(agentsock, sensorsock);
00665 
00666   close(agentsock);
00667   close(sensorsock);
00668   unlink(GRIDSOLVE_SENSOR_USOCK);
00669 }
00670 
00680 void
00681 gs_agent_sensor_pre(void **args)
00682 {
00683   int *pfds;
00684 
00685   if(!args)
00686     return;
00687 
00688   pfds = (int *)malloc(2 * sizeof(int));
00689 
00690   if(!pfds) {
00691     ERRPRINTF("out of memory.\n");
00692     return;
00693   }
00694 
00695   if(pipe(pfds) < 0) {
00696     ERRPRINTF("Cannot create pipe.\n");
00697     return;
00698   }
00699 
00700   args[0] = pfds;
00701 }
00702 
00711 void
00712 gs_agent_sensor_post(void **args)
00713 {
00714   int *pfds, i;
00715   char junk;
00716  
00717   if(!args || !args[0]) {
00718     ERRPRINTF("error waiting for sensor: NULL args.\n");
00719     return;
00720   }
00721 
00722   pfds = (int *)args[0];
00723 
00724   close(pfds[1]);
00725 
00726   if((i = read(pfds[0], &junk, 1)) < 0) {
00727     ERRPRINTF("Error waiting for sensor to start.\n");
00728     return;
00729   }
00730   DBGPRINTF("read returned %d.\n", i);
00731 
00732   close(pfds[0]);
00733 }
00734 
00743 void
00744 gs_agent_sensor_exit(void **args)
00745 {
00746   unlink(GRIDSOLVE_SENSOR_USOCK);
00747 }