00001 #include <sys/types.h>
00002 #include <sys/socket.h>
00003 #include <sys/time.h>
00004 #include <sys/un.h>
00005 #include <stdio.h>
00006 #include <string.h>
00007 #include <stdlib.h>
00008 #include <errno.h>
00009 #include <unistd.h>
00010
00011 #include "comm_encode.h"
00012 #include "comm_basics.h"
00013 #include "comm_encode.h"
00014 #include "utility.h"
00015 #include "mfork.h"
00016 #include "icl_list.h"
00017 #include "agent.h"
00018 #include "gs_storage.h"
00019
00020 int
00021 gs_sensor_handle_serverlist(int);
00022
00029 static void
00030 gs_sensor_generic_signal_handler(int sig)
00031 {
00032
00033 if(sig == SIGHUP) {
00034 kill(getppid(), SIGHUP);
00035 return;
00036 }
00037
00038 ERRPRINTF("Sensor terminating on signal %d.\n", sig);
00039 unlink(GRIDSOLVE_SENSOR_USOCK);
00040 _exit(0);
00041 }
00042
00052 int
00053 gs_sensor_update_all(gs_agent_conn_t *clients, char *msg)
00054 {
00055 int i;
00056
00057 if(!clients || !msg) {
00058 ERRPRINTF("Invalid args\n");
00059 return -1;
00060 }
00061
00062 for(i=0;i<=clients->maxfd;i++)
00063 if(clients->fd[i])
00064 gs_writen(i, msg, strlen(msg));
00065
00066 return 0;
00067 }
00068
00079 int
00080 gs_write_int_nl(int sock, int num)
00081 {
00082 char s[256];
00083 sprintf(s, "%d\n", num);
00084 return gs_writen(sock, s, strlen(s)) < 0 ? -1 : 0;
00085 }
00086
00097 int
00098 gs_write_nl(int sock, char *msg)
00099 {
00100 if(!msg) {
00101 ERRPRINTF("Invalid arg: null msg\n");
00102 return -1;
00103 }
00104
00105 if(gs_writen(sock, msg, strlen(msg)) < 0 ||
00106 gs_writen(sock, "\n", 1) < 0)
00107 return -1;
00108
00109 return 0;
00110 }
00111
00124 int
00125 gs_sensor_get_line(int myfd, char *s, int max)
00126 {
00127 int len;
00128
00129 if(!s) {
00130 ERRPRINTF("Invalid args: null buffer\n");
00131 return -1;
00132 }
00133
00134 len = proxy_read_timeout(myfd, s, max-1, PROXY_TIMEOUT_DEFAULT);
00135
00136 if(len <= 0)
00137 return -1;
00138 else if (len <= max)
00139 s[len] = '\0';
00140
00141 return 0;
00142 }
00143
00150 void
00151 gs_sensor_send_list(int socket)
00152 {
00153 int cnt, i;
00154 gs_server_t **servers;
00155 char temp_cid[CID_LEN * 2 + 1];
00156
00157 if(gs_get_all_servers(NULL, &servers, &cnt) < 0)
00158 {
00159 ERRPRINTF("Sensor can't get server list.\n");
00160
00161
00162
00163
00164 cnt = -1;
00165 }
00166
00167 DBGPRINTF("CNT: %d. SOCKET: %d\n", cnt, socket);
00168
00169
00170
00171
00172 i = gs_write_int_nl(socket, cnt);
00173 for(i = 0; i < cnt; i++)
00174 {
00175 struct in_addr tmpaddr;
00176
00177 gs_write_nl(socket, servers[i]->hostname);
00178 proxy_cid_to_str(temp_cid, servers[i]->componentid);
00179 gs_write_nl(socket, temp_cid);
00180 tmpaddr.s_addr = servers[i]->ipaddress;
00181 gs_write_nl(socket, inet_ntoa(tmpaddr));
00182 gs_write_int_nl(socket, servers[i]->port);
00183 gs_write_int_nl(socket, servers[i]->workload);
00184 gs_write_nl(socket, "standard");
00185 gs_write_nl(socket, "0");
00186 gs_write_nl(socket, "0");
00187 }
00188
00189 for(i = 0; i < cnt; i++)
00190 gs_server_free(servers[i]);
00191 FREE(servers);
00192 }
00193
00203 int
00204 gs_sensor_handle_connection(int sensorsock, int *fd)
00205 {
00206 struct sockaddr_in sclient;
00207 int myfd, len;
00208 char *tok;
00209 char s[GS_SENSOR_MAX_LINE];
00210
00211 if(!fd) {
00212 ERRPRINTF("Invalid arg: null fd\n");
00213 return -1;
00214 }
00215
00216 DBGPRINTF("Received new connection from Agent.\n");
00217 len = sizeof(struct sockaddr_in);
00218 myfd = accept(sensorsock, (struct sockaddr *) &sclient,
00219 (socklen_t *)&len);
00220 if(myfd < 0) {
00221 if(errno == EINTR) {
00222 return -1;
00223 }
00224 else {
00225 perror("accept");
00226 return -1;
00227 }
00228 }
00229
00230
00231 gs_write_nl(myfd, "1.3");
00232
00233
00234
00235
00236 if(gs_sensor_get_line(myfd, s, GS_SENSOR_MAX_LINE) < 0) {
00237 close(myfd);
00238 return -1;
00239 }
00240
00241 DBGPRINTF("READ: %s\n", s);
00242 tok = strtok(s, " \n\r");
00243
00244 if(!tok) {
00245 DBGPRINTF("Badly formed request.\n");
00246 close(myfd);
00247 return -1;
00248 }
00249
00250 if(!strcmp(tok, "NS_SERVERLIST"))
00251 gs_sensor_handle_serverlist(myfd);
00252 else {
00253 DBGPRINTF("Unknown request.\n");
00254 close(myfd);
00255 return -1;
00256 }
00257
00258 *fd = myfd;
00259 return 0;
00260 }
00261
00271 int
00272 gs_sensor_handle_disconnect(int myfd, gs_agent_conn_t *connections)
00273 {
00274 char s[GS_SENSOR_MAX_LINE];
00275
00276 if(!connections) {
00277 ERRPRINTF("Invalid arg: null connections ptr\n");
00278 return -1;
00279 }
00280
00281 DBGPRINTF("DISCONNECT\n");
00282 gs_write_nl(myfd, "DISCONNECTING");
00283
00284 if(gs_sensor_get_line(myfd, s, GS_SENSOR_MAX_LINE) < 0) {
00285 close(myfd);
00286 return -1;
00287 }
00288
00289
00290 gs_agent_del_conn(connections, myfd);
00291 close(myfd);
00292
00293 DBGPRINTF("Sensor removed client %d.\n", myfd);
00294
00295 return 0;
00296 }
00297
00307 int
00308 gs_sensor_handle_serverlist(int myfd)
00309 {
00310 char s[GS_SENSOR_MAX_LINE];
00311
00312 DBGPRINTF("NS_SERVERLIST\n");
00313
00314
00315 gs_write_nl(myfd, "00000000000000000000000000000000");
00316
00317
00318 if(gs_sensor_get_line(myfd, s, GS_SENSOR_MAX_LINE) < 0) {
00319 close(myfd);
00320 return -1;
00321 }
00322
00323 gs_write_nl(myfd, "ACCEPT");
00324 gs_write_int_nl(myfd, myfd);
00325
00326
00327 gs_write_nl(myfd, "-2");
00328 gs_write_nl(myfd, "1.3");
00329 gs_write_nl(myfd, "3.0");
00330
00331
00332 gs_sensor_send_list(myfd);
00333
00334 DBGPRINTF("BACK FROM gs_sensor_send_list.\n");
00335
00336 return 0;
00337 }
00338
00348 int
00349 gs_sensor_handle_ping(int myfd)
00350 {
00351 DBGPRINTF("PING\n");
00352
00353 return gs_write_nl(myfd, "PONG") < 0 ? -1 : 0;
00354 }
00355
00366 int
00367 gs_sensor_process_agent_message(gs_agent_conn_t *connections, int myfd)
00368 {
00369 char s[GS_SENSOR_MAX_LINE];
00370
00371 if(!connections) {
00372 ERRPRINTF("Invalid arg: null connections ptr\n");
00373 return -1;
00374 }
00375
00376 DBGPRINTF("Processing request on fd %d\n", myfd);
00377
00378
00379
00380
00381 if(gs_sensor_get_line(myfd, s, GS_SENSOR_MAX_LINE) < 0) {
00382 close(myfd);
00383 return -1;
00384 }
00385
00386 DBGPRINTF("READ: %s\n", s);
00387
00388 return gs_sensor_update_all(connections, s);
00389 }
00390
00401 int
00402 gs_sensor_process_request(int myfd, gs_agent_conn_t *connections)
00403 {
00404
00405 char tmp_svlist[] = {"NS_SERVERSIST"}, tmp_ping[] = {"PING"},
00406 tmp_dis[] = {"DISCONNECT"}, tmp_0900[] = {"0900"};
00407 char s[GS_SENSOR_MAX_LINE], *tok;
00408
00409 if(!connections) {
00410 ERRPRINTF("Invalid arg: null connections ptr\n");
00411 return -1;
00412 }
00413
00414 DBGPRINTF("Processing request on fd %d\n", myfd);
00415
00416
00417
00418
00419 if(gs_sensor_get_line(myfd, s, GS_SENSOR_MAX_LINE) < 0) {
00420 close(myfd);
00421 return -1;
00422 }
00423
00424 DBGPRINTF("READ: %s\n", s);
00425
00426 tok = strtok(s, " \n\r");
00427 if(!tok)
00428 return -1;
00429
00430 DBGPRINTF("request tok = '%s'\n", tok);
00431
00432 if(!strcmp(tok, tmp_svlist))
00433 return gs_sensor_handle_serverlist(myfd);
00434 else if(!strcmp(tok, tmp_ping))
00435 return gs_sensor_handle_ping(myfd);
00436 else if(!strcmp(tok, tmp_dis))
00437 return gs_sensor_handle_disconnect(myfd, connections);
00438 else if(!strcmp(tok, tmp_0900))
00439 return 0;
00440 else
00441 ERRPRINTF("Unknown message type, tok = %s\n", tok);
00442
00443 return 0;
00444 }
00445
00456 int
00457 gs_agent_sensor_process_messages(int agentsock, int sensorsock)
00458 {
00459 int maxfd, nready, myfd;
00460 fd_set allset, rset;
00461 struct timeval tv;
00462 gs_agent_conn_t conns;
00463
00464 gs_agent_init_conns(&conns);
00465
00466
00467
00468
00469 DBGPRINTF("AGENT SENSOR RUNNING. PID: %d\n", (int)getpid());
00470 while(1) {
00471 gs_agent_setup_fd_sets(&conns, sensorsock, agentsock, &allset, &maxfd);
00472 rset = allset;
00473 tv.tv_sec = 1;
00474 tv.tv_usec = 0;
00475 myfd = -1;
00476
00477 nready = select(maxfd + 1, &rset, NULL, NULL, &tv);
00478
00479 if((nready < 0) && (errno == EINTR))
00480 continue;
00481
00482 if(nready < 0) {
00483 ERRPRINTF("select failed.. aborting.\n");
00484 break;
00485 }
00486
00487 if(nready == 0) {
00488 if(!mfork_check_parent()) {
00489 ERRPRINTF("Parent died, so I am exiting\n");
00490 break;
00491 }
00492 continue;
00493 }
00494 DBGPRINTF("FDs ready: %d.\n", nready);
00495
00496
00497
00498
00499 if(FD_ISSET(sensorsock, &rset)) {
00500 if(gs_sensor_handle_connection(sensorsock, &myfd) < 0) {
00501 ERRPRINTF("Error handling connection on sensor socket.\n");
00502 continue;
00503 }
00504
00505 if(gs_agent_add_conn(&conns, myfd) < 0) {
00506 ERRPRINTF("Could not add connection for fd %d.\n", myfd);
00507 close(myfd);
00508 continue;
00509 }
00510
00511 DBGPRINTF("Accepted Connection on fd %d.\n", myfd);
00512 continue;
00513 }
00514 else if(FD_ISSET(agentsock, &rset)) {
00515 gs_sensor_process_agent_message(&conns, agentsock);
00516 continue;
00517 }
00518 else {
00519 int i;
00520 myfd = -1;
00521
00522
00523
00524
00525 for(i=0;i<=conns.maxfd;i++) {
00526 if(conns.fd[i]) {
00527 if(FD_ISSET(i, &rset)) {
00528 myfd = i;
00529 DBGPRINTF("FD %d is ready to be read.\n", myfd);
00530 break;
00531 }
00532 }
00533 }
00534 }
00535
00536
00537
00538
00539 if(myfd < 0) {
00540 DBGPRINTF("Accept failed or couldn't find fd: %d.\n", myfd);
00541 continue;
00542 }
00543
00544 DBGPRINTF("MyFD = %d.\n", myfd);
00545
00546 if(gs_sensor_process_request(myfd, &conns) < 0) {
00547 gs_agent_del_conn(&conns, myfd);
00548 close(myfd);
00549 }
00550 }
00551
00552 return 0;
00553 }
00554
00565 void
00566 gs_agent_sensor_run(void **args)
00567 {
00568 int *notification_pipe, dump, len, sensorsock, agentsock, tmpsock;
00569 struct sockaddr_un mysaddr;
00570 struct sockaddr_in sclient;
00571 char junk = 'x';
00572 in_port_t port;
00573
00574 if(!args || !args[0] || !args[1]) {
00575 ERRPRINTF("NULL args.. aborting.\n");
00576 _exit(-1);
00577 }
00578
00579 DBGPRINTF("In sensor.\n");
00580 notification_pipe = (int *)args[0];
00581
00582
00583
00584
00585
00586
00587
00588
00589 gs_setup_signal_handlers(gs_sensor_generic_signal_handler);
00590 signal (SIGINT, SIG_IGN);
00591
00592
00593
00594
00595 port = getenv_int("GRIDSOLVE_SENSOR_PORT", GRIDSOLVE_SENSOR_PORT_DEFAULT);
00596
00597 sensorsock = gs_establish_socket(&port, 0);
00598 if(sensorsock < 0) {
00599 ERRPRINTF("Could not open sensor socket.\n");
00600 _exit(-1);
00601 }
00602 gs_listen_on_socket(sensorsock);
00603 DBGPRINTF("Sensor Listening Socket Created on %d.\n", port);
00604
00605
00606
00607
00608 if(gs_storage_init(args[1]) < 0) {
00609 ERRPRINTF("Sensor could not connect to DB.\n");
00610 _exit(-1);
00611 }
00612 DBGPRINTF("Sensor Connected to DB Manager.\n");
00613
00614
00615
00616
00617 tmpsock = socket(PF_UNIX, SOCK_STREAM, 0);
00618 if(tmpsock < 0) {
00619 ERRPRINTF("Could not create agent<->sensor socket.\n");
00620 perror("socket");
00621 _exit(-1);
00622 }
00623
00624 memset(&mysaddr, 0x0, sizeof(struct sockaddr_un));
00625 mysaddr.sun_family = PF_UNIX;
00626 strcpy(mysaddr.sun_path, GRIDSOLVE_SENSOR_USOCK);
00627 unlink(mysaddr.sun_path);
00628
00629 if(bind(tmpsock, (struct sockaddr *) &mysaddr,
00630 sizeof(struct sockaddr_un)) < 0 ) {
00631 ERRPRINTF("Could not bind sensor UDS.\n");
00632 perror("bind");
00633 _exit(-1);
00634 }
00635
00636 if(listen(tmpsock, GRIDSOLVE_AGENT_MAX_CONNECTIONS) < 0) {
00637 ERRPRINTF("Could not listen on sensor UDS.\n");
00638 perror("listen");
00639 _exit(-1);
00640 }
00641
00642
00643
00644
00645
00646 DBGPRINTF("Writing 1.\n");
00647 dump = write(notification_pipe[1], &junk, 1);
00648 DBGPRINTF("Write returned %d.\n", dump);
00649 close(notification_pipe[1]);
00650 close(notification_pipe[0]);
00651
00652
00653 len = sizeof(struct sockaddr_in);
00654 agentsock = accept(tmpsock, (struct sockaddr *) &sclient,
00655 (socklen_t *)&len);
00656 if(agentsock < 0) {
00657 ERRPRINTF("Could not accept connection from agent to sensor.\n");
00658 perror("accept");
00659 _exit(-1);
00660 }
00661 DBGPRINTF("Agent<->Sensor Socket Created.\n");
00662 close(tmpsock);
00663
00664 gs_agent_sensor_process_messages(agentsock, sensorsock);
00665
00666 close(agentsock);
00667 close(sensorsock);
00668 unlink(GRIDSOLVE_SENSOR_USOCK);
00669 }
00670
00680 void
00681 gs_agent_sensor_pre(void **args)
00682 {
00683 int *pfds;
00684
00685 if(!args)
00686 return;
00687
00688 pfds = (int *)malloc(2 * sizeof(int));
00689
00690 if(!pfds) {
00691 ERRPRINTF("out of memory.\n");
00692 return;
00693 }
00694
00695 if(pipe(pfds) < 0) {
00696 ERRPRINTF("Cannot create pipe.\n");
00697 return;
00698 }
00699
00700 args[0] = pfds;
00701 }
00702
00711 void
00712 gs_agent_sensor_post(void **args)
00713 {
00714 int *pfds, i;
00715 char junk;
00716
00717 if(!args || !args[0]) {
00718 ERRPRINTF("error waiting for sensor: NULL args.\n");
00719 return;
00720 }
00721
00722 pfds = (int *)args[0];
00723
00724 close(pfds[1]);
00725
00726 if((i = read(pfds[0], &junk, 1)) < 0) {
00727 ERRPRINTF("Error waiting for sensor to start.\n");
00728 return;
00729 }
00730 DBGPRINTF("read returned %d.\n", i);
00731
00732 close(pfds[0]);
00733 }
00734
00743 void
00744 gs_agent_sensor_exit(void **args)
00745 {
00746 unlink(GRIDSOLVE_SENSOR_USOCK);
00747 }