proxy_server.c

Go to the documentation of this file.
00001 
00016 /* $Id: proxy_server.c,v 1.28 2008/12/10 19:02:04 seymour Exp $ */
00017 /* $UTK_Copyright: $ */
00018 
00019 #ifdef HAVE_CONFIG_H
00020 #include "config.h"
00021 #endif /* HAVE_CONFIG_H */
00022 
00023 #ifndef _REENTRANT
00024 #define _REENTRANT
00025 #endif
00026 
00027 #include <pthread.h>
00028 #include <stdio.h>
00029 #include <stdlib.h>
00030 #include <sys/time.h>
00031 #include <sys/types.h>
00032 #include <unistd.h>
00033 #include <sys/socket.h>
00034 #include <netdb.h>
00035 #include <netinet/in.h>
00036 #include <netinet/tcp.h>
00037 #include <arpa/inet.h>
00038 #include <sys/errno.h>
00039 #include <memory.h>
00040 #include <signal.h>
00041 #include <fcntl.h>
00042 #include <errno.h>
00043 #include <sys/ioctl.h>
00044 
00045 #ifdef HAVE_CONFIG_H
00046 #include "config.h"
00047 #endif /* HAVE_CONFIG_H */
00048 
00049 #include "symtab.h"
00050 #include "queue.h"
00051 #include "proxy_utils.h"
00052 #include "proxy_server.h"
00053 
00054 SYMTABLE *accept_table;
00055 pthread_mutex_t accept_tab_mutex;
00056 QUEUE *removed_components_queue;
00057 
00067 void
00068 proxy_print_component_tag(FILE *f, char *prefix, char *ctag, char *suffix)
00069 {
00070   int id_size, port_size;
00071   in_port_t server_port;
00072   CID server_id;
00073 
00074   id_size = sizeof(server_id.id);
00075   port_size = sizeof(server_port);
00076 
00077   memcpy(&server_port, ctag + id_size, port_size);
00078 
00079   if(prefix)
00080     fprintf(f, "%s ", prefix);
00081   proxy_print_componentID(f, ctag);
00082   fprintf(f, "/%d", ntohs(server_port));
00083   if(suffix)
00084     fprintf(f, " %s", suffix);
00085   fprintf(f, "\n");
00086 }
00087 
00103 int 
00104 proxy_daemon_init(char *root, char *logfile) {
00105   pid_t pid;
00106   int fd;
00107 
00108   if(!root || !logfile)
00109     return -1;
00110 
00111   pid = fork();
00112 
00113   if(pid < 0)
00114     return -1;
00115 
00116   if(pid != 0) /* parent exits */
00117     exit(0);
00118 
00119   setsid();   /* become session leader */
00120 
00121   pid = fork();
00122 
00123   if(pid < 0)
00124     return -1;
00125 
00126   if(pid != 0) /* 1st child exits */
00127     exit(0);
00128 
00129   chdir(root);
00130   umask(0);
00131 
00132   /* Remap stdin to /dev/null */
00133   fd = open("/dev/null", O_RDWR|O_CREAT, 0644);
00134   if(fd < 0) {
00135     perror("open()");
00136     return -1;
00137   }
00138   close(0);
00139   if(dup2(fd, 0) < 0) {
00140     fprintf(stderr,"dup2 failed");
00141     perror("dup2()");
00142     return -1;
00143   }
00144   close(fd);
00145 
00146   fprintf(stderr, "\n"
00147     "==============================================================\n"
00148     "|       -- Giving up terminal control! --\n|\n"
00149     "| All further terminal output will be sent to the log file:\n"
00150     "|\t\"%s\"\n"
00151     "=============================================================="
00152     "\n\n", logfile);
00153 
00154   /* Open logfile */
00155   remove(logfile);
00156   fd = open(logfile, O_RDWR|O_CREAT, 0644);
00157   if(fd < 0){
00158     fprintf(stderr,"dup2 failed");
00159     perror("open()");
00160     return -1;
00161   }
00162 
00163   /* Remap stdout and stderr to logfile */
00164   close(1);
00165   if(dup2(fd, 1) < 0){
00166     fprintf(stderr,"dup2 failed");
00167     perror("dup2()");
00168     return -1;
00169   }
00170   close(2);
00171   if(dup2(fd, 2) < 0){
00172     fprintf(stderr,"dup2 failed");
00173     perror("dup2()");
00174     return -1;
00175   }
00176   close(fd);
00177 
00178   return 0;
00179 }
00180 
00187 int 
00188 proxy_block_sigpipe()
00189 {
00190   sigset_t sigvec;
00191 
00192   if (sigemptyset(&sigvec) < 0 ||
00193       sigaddset(&sigvec, SIGPIPE) < 0 || 
00194       pthread_sigmask(SIG_BLOCK, &sigvec, NULL) != 0) 
00195   {
00196     fprintf(stderr, "Unable to set thread signal mask.  Aborting\n");
00197     return -1;
00198   }
00199 
00200   return 0;
00201 }
00202 
00203 int proxy_krb5_enabled = FALSE;
00204 
00213 static int 
00214 getenv_int(char* name, int defval)
00215 {
00216   char *envstr = NULL;
00217   long int longval = -1;
00218   char *endptr;
00219   extern int errno;
00220 
00221   if (name == NULL) return defval;
00222 
00223   /* Env variable does not exist */
00224   if ((envstr = getenv(name)) == NULL) return defval;
00225 
00226   /* Convert to long, checking for errors */
00227   longval = strtol(envstr, &endptr, 10);
00228   if ((errno == ERANGE) || (longval==0 && endptr==envstr))
00229     return defval;
00230 
00231   return (int)longval;
00232 }
00233 
00250 int
00251 proxy_server_parse_cmd_line(int argc, char **argv, char **logfile, 
00252   int *daemon, int *kerberos)
00253 {
00254   int c;
00255 
00256   *logfile = NULL;
00257   *daemon = 1;
00258   *kerberos = 0;
00259 
00260   /* when making changes to the command line args, update
00261    * PROXY_SERVER_USAGE_STR so the usage information is printed
00262    * correctly upon error.
00263    */
00264 
00265 #define PROXY_SERVER_USAGE_STR "Usage: proxy_server [-l logfile] [-c] [-k]"
00266 
00267   while((c = getopt(argc,argv,"kcl:")) != EOF) {
00268     switch(c) {
00269       case 'l':
00270         *logfile = strdup(optarg);
00271         break;
00272       case 'c':
00273         *daemon = 0;
00274         break;
00275       case 'k':
00276         *kerberos = 1;
00277         break;
00278       case '?':
00279         return -1;
00280         break;
00281       default:
00282         fprintf(stderr,"Bad arg: '%c'.\n",c);
00283         return -1;
00284     }
00285   }
00286 
00287   return 0;
00288 }
00289 
00301 int 
00302 main(int argc, char *argv[])
00303 {
00304   int listen_sock, daemon;
00305   int proxy_listen_port = -1;
00306   socklen_t clilen;
00307   struct sockaddr_in tcp_serv, cliaddr;
00308   void *proxy_generic_conn_handler(void *);
00309   void *proxy_purge_old_components(void *);
00310   char *logfile;
00311   pthread_t tid;
00312 
00313   /* Parse command line args */
00314   if(proxy_server_parse_cmd_line(argc, argv, &logfile, &daemon, 
00315        &proxy_krb5_enabled) < 0) 
00316   {
00317     fprintf(stderr, "%s\n", PROXY_SERVER_USAGE_STR);
00318     exit(EXIT_FAILURE);
00319   }
00320 
00321 #ifdef KERBEROS5
00322   if (proxy_check_krb5_envvars() != 0)
00323     exit(EXIT_FAILURE);
00324 #else
00325   if(proxy_krb5_enabled) {
00326     fprintf(stderr, "Warning!  Kerberos was not enabled during compilation ");
00327     fprintf(stderr, "(-k ignored).\n\n");
00328   }
00329 #endif
00330 
00331   /* if logfile wasn't specified on the command line, use default */
00332   if(!logfile)
00333     logfile = PROXY_LOGFILE;
00334 
00335   /* Make current process into a daemon */
00336   if(daemon && proxy_daemon_init(".", logfile) < 0) {
00337     fprintf(stderr, "Failed to start proxy server as a daemon.\n");
00338     exit(EXIT_FAILURE);
00339   }
00340 
00341   if (proxy_block_sigpipe() < 0) {
00342     fprintf(stderr, "Couldn't block SIGPIPE.  Aborting.\n");
00343     exit(EXIT_FAILURE);
00344   }
00345 
00346   removed_components_queue = new_queue();
00347   if (!removed_components_queue) {
00348     fprintf(stderr, "Bad news!  out of memory allocating queue\n");
00349     exit(EXIT_FAILURE);
00350   }
00351 
00352   /* create a hash table containing the components that have connected */
00353   accept_table = new_symtable(PROXY_ACCEPT_TABLE_SIZE);
00354 
00355   if (!accept_table) {
00356     fprintf(stderr, "Bad news!  out of memory allocating table\n");
00357     exit(EXIT_FAILURE);
00358   }
00359 
00360   if (pthread_mutex_init(&accept_tab_mutex, NULL)) {
00361     fprintf(stderr, "Bad news!  cannot create mutex\n");
00362     exit(EXIT_FAILURE);
00363   }
00364 
00365   proxy_listen_port = getenv_int("PROXY_LISTEN_PORT", PROXY_LISTEN_PORT_DEFAULT);
00366   if (proxy_init_sock(&listen_sock, &tcp_serv, "tcp", FALSE, proxy_listen_port) < 0) {
00367     fprintf(stderr, "Failed to create socket\n");
00368     perror("proxy_init_sock");
00369     exit(EXIT_FAILURE);
00370   }
00371 
00372   if(pthread_create(&tid, NULL, &proxy_purge_old_components, NULL)) {
00373     fprintf(stderr, "Warning: error creating new thread.\n");
00374     perror("pthread_create");
00375   }
00376 
00377   if(pthread_detach(tid)) 
00378     fprintf(stderr,"Warning: could not detach thread after creation.\n");
00379 
00380   printf("proxy IP %u\n", proxy_get_my_ipaddr());
00381   printf("proxy listening on port %d\n", ntohs(tcp_serv.sin_port));
00382 
00383   if (listen(listen_sock, 50) < 0) {
00384     perror("listen");
00385     exit(EXIT_FAILURE);
00386   }
00387 
00388   for (;;) {
00389     int *connfd;
00390 
00391     clilen = sizeof(cliaddr);
00392 
00393     /* malloc to avoid race between accept and new thread */
00394     connfd = (int *) malloc(sizeof(int));
00395 
00396     if (!connfd) {
00397       fprintf(stderr, "Cannot malloc.  going to sleep for a while.\n");
00398       sleep(3);
00399       continue;
00400     }
00401 
00402     *connfd = accept(listen_sock, (struct sockaddr *) &cliaddr, &clilen);
00403 
00404     if (*connfd < 0) {
00405       free(connfd);
00406       perror("accept");
00407       continue;
00408     }
00409 
00410     printf("new client connected!\n");
00411 
00412     if (proxy_auth(*connfd) < 0) {
00413       fprintf(stderr, "Warning: failed to authenticate client.\n");
00414       close(*connfd);
00415       continue;
00416     }
00417 
00418     if (pthread_create(&tid, NULL, &proxy_generic_conn_handler, connfd)) {
00419       fprintf(stderr, "Warning: error creating new thread.\n");
00420       close(*connfd);
00421       free(connfd);
00422       continue;
00423     }
00424 
00425     if(pthread_detach(tid)) 
00426       fprintf(stderr,"Warning: could not detach thread after creation.\n");
00427   }
00428 }
00429 
00439 int 
00440 proxy_auth(int s)
00441 {
00442   proxy_tag_t response;
00443 
00444 #ifdef KERBEROS5
00445   if (proxy_krb5_enabled) {
00446     response = PROXY_KRB5_AUTH_REQUIRED;
00447     if (write(s, &response, sizeof(response)) < 0)
00448       return -1;
00449 
00450     /* get credentials from client */
00451     if (proxy_recv_krb5_credentials(s) < 0) {
00452       /* 
00453        * if we didn't like the credentials; tell client that the
00454        * authentication failed, and this will be the error code
00455        * associated with the problem request.  otherwise, fall
00456        * through to other error checking below
00457        */
00458       response = PROXY_AUTH_FAILED;
00459 
00460       if (write(s, &response, sizeof(response)) == -1)
00461     return -1;
00462 
00463       return -1;
00464     }
00465 
00466     response = PROXY_AUTH_ACCEPTED;
00467   } 
00468   else
00469     response = PROXY_AUTH_ACCEPTED;
00470 #else
00471   response = PROXY_AUTH_ACCEPTED;
00472 #endif
00473 
00474   if (write(s, &response, sizeof(response)) < 0)
00475     return -1;
00476 
00477   return 0;
00478 }
00479 
00487 void
00488 dump_component(char *key, void *var)
00489 {
00490   COMPONENT *item = (COMPONENT *)var;
00491 
00492   proxy_print_component_tag(stdout, "COMPONENT key =", key, NULL);
00493 
00494   printf("\tID = ");
00495   proxy_print_componentID(stdout, item->id.id);
00496   printf("\n");
00497   printf("\tport = %d\n", ntohs(item->port));
00498   printf("\tsockfd = %d\n", item->sockfd);
00499 
00500   return;
00501 }
00502 
00515 void *
00516 proxy_purge_old_components(void *arg)
00517 {
00518   COMPONENT *component;
00519 
00520   for(;;) {
00521     sleep(PROXY_PURGE_FREQ);
00522 
00523     if (pthread_mutex_lock(&accept_tab_mutex) == 0) {
00524       printf("\n");
00525       printf("--------- ACCEPT TABLE [%d items] ---------\n", accept_table->num_items);
00526       hash_dump(accept_table, dump_component);
00527       printf("------------------------------------------\n");
00528       printf("\n");
00529 
00530       pthread_mutex_unlock(&accept_tab_mutex);
00531     }
00532 
00533     if(pthread_mutex_lock(removed_components_queue->mutex) == 0) {
00534 
00535       /* free every component in the queue */
00536 
00537       while((component = (COMPONENT *)dequeue(removed_components_queue))) {
00538         destroy_queue(component->msg_queue);
00539         free(component);
00540       }
00541 
00542       pthread_mutex_unlock(removed_components_queue->mutex);
00543     } else {
00544       fprintf(stderr, "Warning: couldn't lock removed components queue.\n");
00545     }
00546   }
00547 
00548   return NULL;
00549 }
00550 
00560 void *
00561 proxy_generic_conn_handler(void *arg)
00562 {
00563   int s, n;
00564   proxy_tag_t tag;
00565 
00566   s = *((int *) arg);
00567   free(arg);
00568 
00569   n = proxy_tread(s, (void *) &tag, 1, PROXY_TIMEOUT_DEFAULT);
00570 
00571   if (n <= 0) {
00572     fprintf(stderr, "Warning: error reading tag.  Closing connection.\n");
00573     close(s);
00574     return NULL;
00575   }
00576 
00577   printf("tag = %d\n", tag);
00578 
00579   switch (tag) {
00580 
00581     case PROXY_CONTROL_CONNECTION:
00582       proxy_handle_control_connection(s);
00583       break;
00584 
00585     case PROXY_CONNECT:
00586       proxy_handle_connection_request(s);
00587       break;
00588 
00589     /* this case is for testing purposes.. remove later. */
00590     case 'Z':
00591       fprintf(stderr, "TERMINATING...\n");
00592       exit(EXIT_SUCCESS);
00593       break;
00594 
00595     default:
00596       fprintf(stderr, "Warning: unknown tag %d\n", tag);
00597       close(s);
00598       break;
00599   }
00600 
00601   return NULL;
00602 }
00603 
00611 COMPONENT *
00612 proxy_new_component()
00613 {
00614   COMPONENT *new;
00615 
00616   new = (COMPONENT *) malloc(sizeof(COMPONENT));
00617 
00618   if (!new)
00619     return NULL;
00620 
00621   new->id.id[0] = '\0';
00622   new->port = 0;
00623   new->sockfd = 0;
00624   new->to_be_freed = FALSE;
00625   new->msg_queue = NULL;
00626 
00627   return new;
00628 }
00629 
00645 COMPONENT *
00646 proxy_get_request_header(int connfd, CID * client_id)
00647 {
00648   int id_size, port_size, to;
00649   COMPONENT *component;
00650   in_port_t dest_port;
00651   HASHNODE *ht;
00652   CID dest_id;
00653   char component_tag[sizeof(CID) + sizeof(dest_port) + 1];
00654 
00655   to = PROXY_TIMEOUT_DEFAULT;
00656   id_size = sizeof(client_id->id);
00657   port_size = sizeof(dest_port);
00658 
00659   if (proxy_tread(connfd, (void *) client_id->id, id_size, to) <= 0 ||
00660       proxy_tread(connfd, (void *) dest_id.id, id_size, to) <= 0 ||
00661       proxy_tread(connfd, (void *) &dest_port, port_size, to) <= 0) {
00662     fprintf(stderr, "Error reading IDs.  Aborting connection.\n");
00663     return NULL;
00664   }
00665 
00666   /* component_tag is composed of CID/port */
00667   memcpy(component_tag, dest_id.id, id_size);
00668   memcpy(component_tag + id_size, &dest_port, port_size);
00669   *(component_tag + id_size + port_size) = '\0';
00670 
00671   proxy_print_component_tag(stdout, "CONNECT REQ dest = ", 
00672     component_tag, NULL);
00673 
00676   if (pthread_mutex_lock(&accept_tab_mutex) == 0) {
00677     ht = hash_lookup(accept_table, (void *) component_tag);
00678 
00679     if (!ht) {
00680       proxy_tag_t response = PROXY_CONNECTION_REFUSED;
00681 
00682       fprintf(stderr, "Component entry not found!\n");
00683       pthread_mutex_unlock(&accept_tab_mutex);
00684       write(connfd, &response, sizeof(response));
00685       return NULL;
00686     }
00687 
00688     component = (COMPONENT *) (ht->item);
00689 
00690     printf("found entry.. socket desc = %d\n", component->sockfd);
00691 
00692     pthread_mutex_unlock(&accept_tab_mutex);
00693   } else {
00694     fprintf(stderr, "proxy_get_request_header(): Error locking.\n");
00695     return NULL;
00696   }
00697 
00698   return component;
00699 }
00700 
00718 int 
00719 proxy_send_cid_to_control_conn(int s, COMPONENT * c, CID client_id, 
00720   in_port_t port)
00721 {
00722   /* must enqueue request to control connection and signal notempty */
00723 
00724   printf("waiting for msg queue lock\n");
00725 
00726   if (pthread_mutex_lock(c->msg_queue->mutex) == 0) {
00727     COMPONENT *client_component;
00728 
00729     client_component = proxy_new_component();
00730 
00731     /* make sure this component isn't destined to be freed */
00732     if (c->to_be_freed || !client_component) {
00733       proxy_tag_t response = PROXY_CONNECTION_REFUSED;
00734 
00735       if (client_component)
00736     free(client_component);
00737       pthread_mutex_unlock(c->msg_queue->mutex);
00738       write(s, &response, sizeof(response));
00739       return -1;
00740     }
00741 
00742     memcpy(client_component->id.id, client_id.id, sizeof(client_id.id));
00743     client_component->port = port;
00744 
00745     printf("putting component in queue\n");
00746 
00747     enqueue(c->msg_queue, (void *) client_component);
00748     pthread_cond_signal(c->msg_queue->notEmpty);
00749     pthread_mutex_unlock(c->msg_queue->mutex);
00750   } else {
00751     fprintf(stderr, "Error locking.\n");
00752     return -1;
00753   }
00754 
00755   return 0;
00756 }
00757 
00774 int 
00775 proxy_wait_for_server_conn(int connfd, int sock2)
00776 {
00777   int maxfd, nready;
00778   fd_set allset, rset;
00779   struct timeval tv;
00780   int newfd = -1;
00781   struct sockaddr_in cliaddr;
00782   socklen_t clilen;
00783 
00784   /* now listen for incoming connection from server behind nat */
00785 
00786   maxfd = sock2;
00787   FD_ZERO(&allset);
00788   FD_SET(sock2, &allset);
00789 
00790   rset = allset;
00791   tv.tv_sec = 15;
00792   tv.tv_usec = 0;
00793 
00794   printf("waiting for 2nd connection\n");
00795   nready = select(maxfd + 1, &rset, NULL, NULL, &tv);
00796 
00797   if (nready <= 0) {
00798     proxy_tag_t response = PROXY_CONNECTION_REFUSED;
00799 
00800     printf("select error %d (0 means timed out)\n", nready);
00801     write(connfd, &response, sizeof(response));
00802     return -1;
00803   }
00804 
00805   if (FD_ISSET(sock2, &rset)) {
00806     clilen = sizeof(cliaddr);
00807 
00808     printf("something happened on the second listening socket!\n");
00809 
00810     newfd = accept(sock2, (struct sockaddr *) &cliaddr, &clilen);
00811 
00812     printf("after accept, new fd = %d\n", newfd);
00813 
00814     if (proxy_auth(newfd) < 0) {
00815       fprintf(stderr, "Warning: failed to authenticate client.\n");
00816       close(newfd);
00817     }
00818   }
00819 
00820   return newfd;
00821 }
00822 
00834 int 
00835 proxy_handle_data_transfer(int client_fd, int serv_fd)
00836 {
00837   int maxfd, nready;
00838   fd_set allset, rset;
00839 
00840   maxfd = serv_fd > client_fd ? serv_fd : client_fd;
00841 
00842   FD_ZERO(&allset);
00843   FD_SET(serv_fd, &allset);
00844   FD_SET(client_fd, &allset);
00845 
00846   for (;;) {
00847     int n;
00848     char buf[PROXY_XFER_CHUNKSIZE];
00849 
00850     rset = allset;
00851 
00852     nready = select(maxfd + 1, &rset, NULL, NULL, NULL);
00853 
00854     if (nready < 0) {
00855 
00856       if (errno == EINTR)
00857     continue;
00858 
00859       return -1;
00860     }
00861 
00862     if (nready == 0)
00863       continue;
00864 
00865     if (FD_ISSET(client_fd, &rset)) {
00866       n = proxy_read_timeout(client_fd, buf, PROXY_XFER_CHUNKSIZE, 
00867         PROXY_TIMEOUT_DEFAULT);
00868 
00869       if (n <= 0) {
00870     printf("seems the requesting host (client) broke connection\n");
00871     break;
00872       }
00873 
00874       n = write(serv_fd, buf, n);
00875     } else if (FD_ISSET(serv_fd, &rset)) {
00876       n = proxy_read_timeout(serv_fd, buf, PROXY_XFER_CHUNKSIZE, 
00877         PROXY_TIMEOUT_DEFAULT);
00878 
00879       if (n <= 0) {
00880     printf("seems the proxied host (server) broke connection\n");
00881     break;
00882       }
00883 
00884       n = write(client_fd, buf, n);
00885     }
00886   }
00887 
00888   return 0;
00889 }
00890 
00906 int 
00907 proxy_check_client_match(int data_conn_fd, int client_fd, 
00908   in_port_t port, CID client_id)
00909 {
00910   proxy_tag_t response = PROXY_CONNECTION_ACCEPTED;
00911   in_port_t new_port;
00912   CID new_cid;
00913   int n;
00914 
00915   printf("sending connection accepted to client\n");
00916   n = write(client_fd, &response, sizeof(response));
00917 
00918   if (n <= 0) {
00919     fprintf(stderr, "Error writing response to client\n");
00920     return -1;
00921   }
00922 
00923   printf("getting response\n");
00924 
00925   n = proxy_tread(data_conn_fd, (char *)&response, sizeof(response), 
00926     PROXY_TIMEOUT_DEFAULT);
00927   if (n <= 0) {
00928     fprintf(stderr, "Error getting response from client\n");
00929     return -1;
00930   }
00931 
00932   if (response != PROXY_CONNECT_REPLY) {
00933     fprintf(stderr, "Didn't get the expected tag!!\n");
00934     return -1;
00935   }
00936 
00937   printf("getting cid\n");
00938   n = proxy_tread(data_conn_fd, (char *)&(new_cid.id), sizeof(new_cid.id), 
00939     PROXY_TIMEOUT_DEFAULT);
00940   if (n <= 0) {
00941     fprintf(stderr, "Error getting ID from client\n");
00942     return -1;
00943   }
00944 
00945   printf("getting port\n");
00946   n = proxy_tread(data_conn_fd, (char *)&new_port, sizeof(new_port), 
00947     PROXY_TIMEOUT_DEFAULT);
00948   if (n <= 0) {
00949     fprintf(stderr, "Error getting port from client\n");
00950     return -1;
00951   }
00952 
00953   /* check for a bad match */
00954   if (memcmp(client_id.id, new_cid.id, sizeof(new_cid.id))
00955       || (port != new_port)) {
00956     fprintf(stderr, "BAD MATCH!!\n");
00957     response = PROXY_CONNECTION_REFUSED;
00958     write(data_conn_fd, &response, sizeof(response));
00959 
00960     return -1;
00961   }
00962 
00963   printf("IDs match, now sending connection accepted msg to server.\n");
00964 
00965   /* found a match.. send connection accepted to server */
00966   response = PROXY_CONNECTION_ACCEPTED;
00967   n = write(data_conn_fd, &response, sizeof(response));
00968   if (n <= 0) {
00969     fprintf(stderr, "Error writing response to client\n");
00970     return -1;
00971   }
00972 
00973   return 0;
00974 }
00975 
00985 int 
00986 proxy_handle_connection_request(int connfd)
00987 {
00988   int new_incoming_sock, data_conn_fd;
00989   struct sockaddr_in tcp_serv;
00990   CID client_id;
00991   COMPONENT *component;
00992 
00993   if (proxy_block_sigpipe() < 0)
00994     return -1;
00995 
00996   component = proxy_get_request_header(connfd, &client_id);
00997 
00998   if (!component) {
00999     fprintf(stderr, "Error getting component header.  Aborting connection.\n");
01000     close(connfd);
01001     return -1;
01002   }
01003 
01004   if (proxy_init_sock(&new_incoming_sock, &tcp_serv, "tcp", FALSE, 0) < 0) {
01005     fprintf(stderr, "Error creating new incoming socket.\n");
01006     close(connfd);
01007     return -1;
01008   }
01009 
01010   printf("new port  -->  %d\n", ntohs(tcp_serv.sin_port));
01011 
01012   if (listen(new_incoming_sock, 5) < 0) {
01013     proxy_tag_t response = PROXY_CONNECTION_REFUSED;
01014 
01015     write(connfd, &response, sizeof(response));
01016     close(connfd);
01017     return -1;
01018   }
01019 
01020   if (proxy_send_cid_to_control_conn(connfd, component, client_id, tcp_serv.sin_port) < 0) {
01021     fprintf(stderr, "Error queueing control id.\n");
01022     close(connfd);
01023     return -1;
01024   }
01025 
01026   data_conn_fd = proxy_wait_for_server_conn(connfd, new_incoming_sock);
01027 
01028   if (data_conn_fd > 0) {
01029     if (proxy_check_client_match(data_conn_fd, connfd, tcp_serv.sin_port, client_id) < 0) {
01030       printf("bad match\n");
01031     } else
01032       proxy_handle_data_transfer(connfd, data_conn_fd);
01033 
01034     close(data_conn_fd);
01035   }
01036 
01037   close(connfd);
01038   close(new_incoming_sock);
01039 
01040   return 0;
01041 }
01042 
01054 COMPONENT *
01055 proxy_init_server_component(int connfd, char *component_tag)
01056 {
01057   COMPONENT *component;
01058   CID server_id;
01059   in_port_t server_port;
01060   int id_size, to;
01061   int port_size;
01062 
01063   to = PROXY_TIMEOUT_DEFAULT;
01064   id_size = sizeof(server_id.id);
01065   port_size = sizeof(server_port);
01066 
01067   component = proxy_new_component();
01068   if (component)
01069     component->msg_queue = new_queue();
01070 
01071   if (!component || !component->msg_queue) {
01072     fprintf(stderr, "Cannot malloc, aborting connection\n");
01073     if (component) {
01074       if (component->msg_queue)
01075     destroy_queue(component->msg_queue);
01076       free(component);
01077     }
01078     return NULL;
01079   }
01080 
01081   printf("id size = %d, port size = %d\n", id_size, port_size);
01082 
01083   if (proxy_tread(connfd, (void *) server_id.id, id_size, to) < 0 ||
01084       proxy_tread(connfd, (void *) &server_port, port_size, to) < 0) {
01085     if (component->msg_queue)
01086       destroy_queue(component->msg_queue);
01087     free(component);
01088     return NULL;
01089   }
01090 
01091   printf("port = %d\n", ntohs(server_port));
01092 
01093   /* component_tag is composed of CID/port */
01094   memcpy(component_tag, server_id.id, id_size);
01095   memcpy(component_tag + id_size, &server_port, port_size);
01096   *(component_tag + id_size + port_size) = '\0';
01097 
01098   proxy_print_component_tag(stdout, "component_tag =", component_tag, NULL);
01099 
01100   component->id = server_id;
01101   component->port = server_port;
01102   component->sockfd = connfd;
01103 
01104   if (pthread_mutex_lock(&accept_tab_mutex) == 0) {
01105     HASHNODE *ht;
01106 
01107     ht = hash_lookup(accept_table, (void *) component_tag);
01108 
01109     if(ht) {
01110       proxy_print_component_tag(stderr, "Component ", component_tag,
01111         " already exists in the table!\n");
01112       pthread_mutex_unlock(&accept_tab_mutex);
01113       return NULL;
01114     }
01115 
01116     hash_insert(accept_table, (void *) component, (void *) component_tag);
01117     pthread_mutex_unlock(&accept_tab_mutex);
01118   } else {
01119     if (component->msg_queue)
01120       destroy_queue(component->msg_queue);
01121     free(component);
01122     return NULL;
01123   }
01124 
01125   return component;
01126 }
01127 
01141 COMPONENT *
01142 proxy_wait_for_queue_message(int connfd, COMPONENT * component)
01143 {
01144   COMPONENT *item;
01145   int keepalive_timer;
01146 
01147   keepalive_timer = PROXY_KEEPALIVE_FREQ;
01148 
01149   if (pthread_mutex_lock(component->msg_queue->mutex) == 0) {
01150     while (is_empty(component->msg_queue)) {
01151       struct timespec timeout;
01152       struct timeval tv;
01153       int rv;
01154 
01155       gettimeofday(&tv, NULL);
01156 
01157       timeout.tv_sec = tv.tv_sec + PROXY_MSG_QUEUE_FREQ;
01158       timeout.tv_nsec = 0;
01159 
01160       /* wait until a message is placed in the queue or until the timeout elapses. */
01161       rv = pthread_cond_timedwait(component->msg_queue->notEmpty,
01162                   component->msg_queue->mutex, &timeout);
01163 
01164       keepalive_timer -= PROXY_MSG_QUEUE_FREQ;
01165 
01166       if (rv != 0) {
01167         int remove_srv = 0;
01168 
01169     /* if we timed out, check if the server is still there. */
01170 
01171         if(keepalive_timer <= 0) {
01172           keepalive_timer = PROXY_KEEPALIVE_FREQ;
01173       remove_srv = (proxy_send_keepalive(connfd) < 0);
01174         }
01175         else {
01176       remove_srv = (proxy_is_something_on_socket(connfd) == 0);
01177         }
01178 
01179     if(remove_srv) {
01180       char component_tag[sizeof(CID) + sizeof(in_port_t) + 1];
01181       int id_size = sizeof(component->id);
01182       int port_size = sizeof(component->port);
01183 
01184       printf("server must have gone away!\n");
01185 
01186       /* component_tag is composed of CID/port */
01187       memcpy(component_tag, &(component->id), id_size);
01188       memcpy(component_tag + id_size, &(component->port), port_size);
01189       *(component_tag + id_size + port_size) = '\0';
01190 
01191       proxy_delete_server_tag(component_tag, component);
01192           close(component->sockfd);
01193 
01194       return NULL;
01195     }
01196       }
01197     }
01198     item = (COMPONENT *) dequeue(component->msg_queue);
01199     pthread_mutex_unlock(component->msg_queue->mutex);
01200   } else
01201     return NULL;
01202 
01203   return item;
01204 }
01205 
01221 int
01222 proxy_send_conn_request_to_server(int connfd, char *component_tag,
01223                 COMPONENT * component, COMPONENT * item)
01224 {
01225   proxy_tag_t response = PROXY_CONNECT_REQUEST;
01226 
01227   printf("going to write to the server now (port = %d)...\n", ntohs(item->port));
01228   if (write(connfd, &response, sizeof(response)) < 0 ||
01229       write(connfd, item->id.id, sizeof(item->id.id)) < 0 ||
01230       write(connfd, &item->port, sizeof(item->port)) < 0) {
01231     fprintf(stderr, "Can't write to server, aborting.\n");
01232 
01233     proxy_delete_server_tag(component_tag, component);
01234 
01235     return -1;
01236   }
01237 
01238   return 0;
01239 }
01240 
01251 int 
01252 proxy_delete_server_tag(char *component_tag, COMPONENT * component)
01253 {
01254 
01255   proxy_print_component_tag(stdout, "going to attempt to delete component: ",
01256     component_tag, NULL);
01257 
01258   if (pthread_mutex_lock(&accept_tab_mutex) == 0) {
01259     HASHNODE *ht;
01260 
01261     printf("going to delete component_tag now...\n");
01262     ht = hash_delete(accept_table, (void *) component_tag);
01263 
01264     if (!ht)
01265       printf("warning: expected to find component in hash table\n");
01266     else if (component != (COMPONENT *) (ht->item))
01267       printf("warning: wrong component! \n");
01268 
01269     if(ht) free(ht);
01270     pthread_mutex_unlock(&accept_tab_mutex);
01271   } else
01272     return -1;
01273 
01274   component->to_be_freed = TRUE;
01275 
01276   /* put component in queue to be freed later */
01277 
01278   if (pthread_mutex_lock(removed_components_queue->mutex) == 0) {
01279 
01280     printf("putting component in removal queue\n");
01281 
01282     enqueue(removed_components_queue, (void *) component);
01283     pthread_mutex_unlock(removed_components_queue->mutex);
01284   } else {
01285     fprintf(stderr, "Warning: couldn't lock removed components queue.\n");
01286   }
01287 
01288   return 0;
01289 }
01290 
01300 int 
01301 proxy_handle_control_connection(int connfd)
01302 {
01303   COMPONENT *component;
01304   char component_tag[sizeof(CID) + sizeof(in_port_t) + 1];
01305 
01306   if (proxy_block_sigpipe() < 0)
01307     return -1;
01308 
01309   printf("i am a thread handling a control connection %d\n", connfd);
01310 
01311   component = proxy_init_server_component(connfd, component_tag);
01312   if (!component) {
01313     fprintf(stderr, "Error intializing server component.\n");
01314     close(connfd);
01315     return -1;
01316   }
01317 
01318   for (;;) {
01319     COMPONENT *item;
01320 
01321     item = proxy_wait_for_queue_message(connfd, component);
01322 
01323     if (!item) {
01324       close(connfd);
01325       return -1;
01326     }
01327 
01328     if (proxy_send_conn_request_to_server(connfd, component_tag, component, item) < 0) {
01329       free(item);
01330       close(connfd);
01331       return -1;
01332     }
01333 
01334     /* free the queue item.  the msg_queue field isn't initialized for queue items, so
01335      * there's no need to dispose of it here. */
01336     free(item);
01337   }
01338 
01339   return 0;
01340 }
01341 
01358 int 
01359 proxy_init_sock(int *sfd, struct sockaddr_in *serv, char *proto, int nb, int port)
01360 {
01361   int r, flag = 1;
01362   socklen_t namelen;
01363   char istcp = strcmp(proto, "udp");
01364 
01365   if ((*sfd = socket(AF_INET, istcp ? SOCK_STREAM : SOCK_DGRAM, 0)) < 0)
01366     return *sfd;
01367 
01368   if ((r = setsockopt(*sfd, SOL_SOCKET, SO_REUSEADDR, (char *) &flag, sizeof(flag))) < 0)
01369     return r;
01370 
01371   if(istcp)
01372     setsockopt(*sfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
01373 
01374   serv->sin_family = AF_INET;
01375   serv->sin_addr.s_addr = htonl(INADDR_ANY);
01376   serv->sin_port = htons(port);
01377 
01378   if ((r = bind(*sfd, (struct sockaddr *) serv, sizeof(*serv))) < 0)
01379     return r;
01380 
01381   namelen = sizeof(*serv);
01382   if ((r = getsockname(*sfd, (struct sockaddr *) serv, &namelen)) < 0)
01383     return r;
01384 
01385   if (nb)
01386     if ((r = fcntl(*sfd, F_SETFL, O_NDELAY)) < 0)   /* make socket non-blocking */
01387       return r;
01388 
01389   return 0;
01390 }
01391 
01398 #ifdef KERBEROS5
01399 int 
01400 proxy_check_krb5_envvars()
01401 {
01402 
01403   if (getenv("GRIDSOLVE_KEYTAB") == NULL) {
01404     fprintf(stderr, "Environment variable GRIDSOLVE_KEYTAB not defined\n");
01405     return -1;
01406   }
01407 
01408   if (getenv("GRIDSOLVE_USERS") == NULL) {
01409     fprintf(stderr, "Environment variable GRIDSOLVE_USERS not defined\n");
01410     return -1;
01411   }
01412 
01413   return 0;
01414 }
01415 #endif